Tighten config and admin typing

This commit is contained in:
Nick Sweeting
2026-03-15 19:49:52 -07:00
parent 44cabac8d0
commit 3889eb4efa
3 changed files with 108 additions and 86 deletions

View File

@@ -3,9 +3,8 @@ __package__ = 'archivebox.config'
import os
import shutil
import inspect
from typing import Any, List, Dict
from benedict import benedict
from pathlib import Path
from typing import Any, Dict
from django.http import HttpRequest
from django.utils import timezone
from django.utils.html import format_html
@@ -332,17 +331,17 @@ def plugin_detail_view(request: HttpRequest, key: str, **kwargs) -> ItemContext:
if properties_summary:
fields["Config Properties"] = mark_safe('<br/>'.join(properties_summary))
section: SectionData = {
"name": plugin['name'],
"description": plugin['path'],
"fields": fields,
"help_texts": {},
}
return ItemContext(
slug=key,
title=plugin['name'],
data=[
{
"name": plugin['name'],
"description": plugin['path'],
"fields": fields,
"help_texts": {},
},
],
data=[section],
)
@@ -369,16 +368,22 @@ def worker_list_view(request: HttpRequest, **kwargs) -> TableContext:
table=rows,
)
all_config_entries = [
benedict(config)
for config in (supervisor.getAllConfigInfo() or [])
if isinstance(config, dict) and "name" in config
]
all_config = {str(config["name"]): config for config in all_config_entries}
all_config: dict[str, dict[str, object]] = {}
config_items = supervisor.getAllConfigInfo()
if not isinstance(config_items, list):
config_items = []
for config_data in config_items:
if not isinstance(config_data, dict):
continue
config_name = config_data.get("name")
if not isinstance(config_name, str):
continue
all_config[config_name] = config_data
# Add top row for supervisord process manager
rows["Name"].append(ItemLink('supervisord', key='supervisord'))
rows["State"].append(supervisor.getState()['statename'])
supervisor_state = supervisor.getState()
rows["State"].append(str(supervisor_state.get('statename') if isinstance(supervisor_state, dict) else ''))
rows['PID'].append(str(supervisor.getPID()))
rows["Started"].append('-')
rows["Command"].append('supervisord --configuration=tmp/supervisord.conf')
@@ -392,23 +397,31 @@ def worker_list_view(request: HttpRequest, **kwargs) -> TableContext:
rows['Exit Status'].append('0')
# Add a row for each worker process managed by supervisord
for proc_data in supervisor.getAllProcessInfo():
process_items = supervisor.getAllProcessInfo()
if not isinstance(process_items, list):
process_items = []
for proc_data in process_items:
if not isinstance(proc_data, dict):
continue
proc = benedict(proc_data)
rows["Name"].append(ItemLink(proc.name, key=proc.name))
rows["State"].append(proc.statename)
rows['PID'].append(proc.description.replace('pid ', ''))
rows["Started"].append(parse_date(proc.start).strftime("%Y-%m-%d %H:%M:%S") if proc.start else '')
rows["Command"].append(all_config[proc.name].command)
proc_name = str(proc_data.get("name") or "")
proc_description = str(proc_data.get("description") or "")
proc_start = proc_data.get("start")
proc_logfile = str(proc_data.get("stdout_logfile") or "")
proc_config = all_config.get(proc_name, {})
rows["Name"].append(ItemLink(proc_name, key=proc_name))
rows["State"].append(str(proc_data.get("statename") or ""))
rows['PID'].append(proc_description.replace('pid ', ''))
rows["Started"].append(parse_date(proc_start).strftime("%Y-%m-%d %H:%M:%S") if proc_start else '')
rows["Command"].append(str(proc_config.get("command") or ""))
rows["Logfile"].append(
format_html(
'<a href="/admin/environment/logs/{}/">{}</a>',
proc.stdout_logfile.split("/")[-1].split('.')[0],
proc.stdout_logfile,
proc_logfile.split("/")[-1].split('.')[0],
proc_logfile,
)
)
rows["Exit Status"].append(str(proc.exitstatus))
rows["Exit Status"].append(str(proc_data.get("exitstatus") or ""))
return TableContext(
title="Running worker processes",
@@ -433,48 +446,51 @@ def worker_detail_view(request: HttpRequest, key: str, **kwargs) -> ItemContext:
data=[],
)
all_config = [
benedict(config)
for config in (supervisor.getAllConfigInfo() or [])
if isinstance(config, dict)
]
all_config: list[dict[str, object]] = []
config_items = supervisor.getAllConfigInfo()
if not isinstance(config_items, list):
config_items = []
for config_data in config_items:
if isinstance(config_data, dict):
all_config.append(config_data)
if key == 'supervisord':
relevant_config = CONFIG_FILE.read_text()
relevant_logs = str(supervisor.readLog(0, 10_000_000))
start_ts = [line for line in relevant_logs.split("\n") if "RPC interface 'supervisor' initialized" in line][-1].split(",", 1)[0]
uptime = str(timezone.now() - parse_date(start_ts)).split(".")[0]
supervisor_state = supervisor.getState()
proc = benedict(
{
"name": "supervisord",
"pid": supervisor.getPID(),
"statename": supervisor.getState()["statename"],
"start": start_ts,
"stop": None,
"exitstatus": "",
"stdout_logfile": "logs/supervisord.log",
"description": f'pid 000, uptime {uptime}',
}
)
proc: Dict[str, object] = {
"name": "supervisord",
"pid": supervisor.getPID(),
"statename": str(supervisor_state.get("statename") if isinstance(supervisor_state, dict) else ""),
"start": start_ts,
"stop": None,
"exitstatus": "",
"stdout_logfile": "logs/supervisord.log",
"description": f'pid 000, uptime {uptime}',
}
else:
proc = benedict(get_worker(supervisor, key) or {})
relevant_config = next((config for config in all_config if config.get('name') == key), benedict({}))
relevant_logs = str(supervisor.tailProcessStdoutLog(key, 0, 10_000_000)[0])
worker_data = get_worker(supervisor, key)
proc = worker_data if isinstance(worker_data, dict) else {}
relevant_config = next((config for config in all_config if config.get('name') == key), {})
log_result = supervisor.tailProcessStdoutLog(key, 0, 10_000_000)
relevant_logs = str(log_result[0] if isinstance(log_result, tuple) else log_result)
section: SectionData = {
"name": key,
"description": key,
"fields": {
"Command": str(proc.name),
"PID": str(proc.pid),
"State": str(proc.statename),
"Started": parse_date(proc.start).strftime("%Y-%m-%d %H:%M:%S") if proc.start else "",
"Stopped": parse_date(proc.stop).strftime("%Y-%m-%d %H:%M:%S") if proc.stop else "",
"Exit Status": str(proc.exitstatus),
"Logfile": str(proc.stdout_logfile),
"Uptime": str((proc.description or "").split("uptime ", 1)[-1]),
"Config": obj_to_yaml(dict(relevant_config)) if isinstance(relevant_config, dict) else str(relevant_config),
"Command": str(proc.get("name") or ""),
"PID": str(proc.get("pid") or ""),
"State": str(proc.get("statename") or ""),
"Started": parse_date(proc.get("start")).strftime("%Y-%m-%d %H:%M:%S") if proc.get("start") else "",
"Stopped": parse_date(proc.get("stop")).strftime("%Y-%m-%d %H:%M:%S") if proc.get("stop") else "",
"Exit Status": str(proc.get("exitstatus") or ""),
"Logfile": str(proc.get("stdout_logfile") or ""),
"Uptime": str(str(proc.get("description") or "").split("uptime ", 1)[-1]),
"Config": obj_to_yaml(relevant_config) if isinstance(relevant_config, dict) else str(relevant_config),
"Logs": relevant_logs,
},
"help_texts": {"Uptime": "How long the process has been running ([days:]hours:minutes:seconds)"},
@@ -492,8 +508,10 @@ def log_list_view(request: HttpRequest, **kwargs) -> TableContext:
assert is_superuser(request), "Must be a superuser to view configuration settings."
log_files = CONSTANTS.LOGS_DIR.glob("*.log")
log_files = sorted(log_files, key=os.path.getmtime)[::-1]
log_files: list[Path] = []
for logfile in sorted(CONSTANTS.LOGS_DIR.glob("*.log"), key=os.path.getmtime)[::-1]:
if isinstance(logfile, Path):
log_files.append(logfile)
rows = {
"Name": [],
@@ -533,20 +551,20 @@ def log_detail_view(request: HttpRequest, key: str, **kwargs) -> ItemContext:
log_text = log_file.read_text()
log_stat = log_file.stat()
section: SectionData = {
"name": key,
"description": key,
"fields": {
"Path": str(log_file),
"Size": f"{log_stat.st_size//1000} kb",
"Last Updated": parse_date(log_stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S"),
"Tail": "\n".join(log_text[-10_000:].split("\n")[-20:]),
"Full Log": log_text,
},
}
return ItemContext(
slug=key,
title=key,
data=[
{
"name": key,
"description": key,
"fields": {
"Path": str(log_file),
"Size": f"{log_stat.st_size//1000} kb",
"Last Updated": parse_date(log_stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S"),
"Tail": "\n".join(log_text[-10_000:].split("\n")[-20:]),
"Full Log": log_text,
},
},
],
data=[section],
)

View File

@@ -225,26 +225,29 @@ class ArchiveResultInline(admin.TabularInline):
def get_formset(self, request, obj=None, **kwargs):
formset = super().get_formset(request, obj, **kwargs)
snapshot = self.get_parent_object_from_request(request)
form_class = getattr(formset, 'form', None)
base_fields = getattr(form_class, 'base_fields', {})
snapshot_output_dir = str(snapshot.output_dir) if snapshot else ''
# import ipdb; ipdb.set_trace()
# formset.form.base_fields['id'].widget = formset.form.base_fields['id'].hidden_widget()
# default values for new entries
formset.form.base_fields['status'].initial = 'succeeded'
formset.form.base_fields['start_ts'].initial = timezone.now()
formset.form.base_fields['end_ts'].initial = timezone.now()
formset.form.base_fields['cmd_version'].initial = '-'
formset.form.base_fields['pwd'].initial = str(snapshot.output_dir)
formset.form.base_fields['cmd'].initial = '["-"]'
formset.form.base_fields['output_str'].initial = 'Manually recorded cmd output...'
base_fields['status'].initial = 'succeeded'
base_fields['start_ts'].initial = timezone.now()
base_fields['end_ts'].initial = timezone.now()
base_fields['cmd_version'].initial = '-'
base_fields['pwd'].initial = snapshot_output_dir
base_fields['cmd'].initial = '["-"]'
base_fields['output_str'].initial = 'Manually recorded cmd output...'
if obj is not None:
# hidden values for existing entries and new entries
formset.form.base_fields['start_ts'].widget = formset.form.base_fields['start_ts'].hidden_widget()
formset.form.base_fields['end_ts'].widget = formset.form.base_fields['end_ts'].hidden_widget()
formset.form.base_fields['cmd'].widget = formset.form.base_fields['cmd'].hidden_widget()
formset.form.base_fields['pwd'].widget = formset.form.base_fields['pwd'].hidden_widget()
formset.form.base_fields['cmd_version'].widget = formset.form.base_fields['cmd_version'].hidden_widget()
base_fields['start_ts'].widget = base_fields['start_ts'].hidden_widget()
base_fields['end_ts'].widget = base_fields['end_ts'].hidden_widget()
base_fields['cmd'].widget = base_fields['cmd'].hidden_widget()
base_fields['pwd'].widget = base_fields['pwd'].hidden_widget()
base_fields['cmd_version'].widget = base_fields['cmd_version'].hidden_widget()
return formset
def get_readonly_fields(self, request, obj=None):

View File

@@ -205,8 +205,9 @@ class SnapshotAdmin(SearchResultsAdminMixin, ConfigEditorMixin, BaseModelAdmin):
actions = super().get_actions(request)
if not actions:
return {}
if 'delete_selected' in actions:
func, name, _desc = actions['delete_selected']
delete_selected = actions.get('delete_selected')
if delete_selected:
func, name, _desc = delete_selected
actions['delete_selected'] = (func, name, 'Delete')
return actions