WIP: checkpoint working tree before rebasing onto dev

This commit is contained in:
Nick Sweeting
2026-03-22 20:23:45 -07:00
parent a6548df8d0
commit f400a2cd67
87 changed files with 12607 additions and 1808 deletions

View File

@@ -1,2 +1,169 @@
__package__ = "archivebox.personas"
# Register your models here.
import shutil
from django.contrib import admin, messages
from django.utils.html import format_html, format_html_join
from archivebox.base_models.admin import BaseModelAdmin, ConfigEditorMixin
from archivebox.personas.forms import PersonaAdminForm
from archivebox.personas.importers import discover_local_browser_profiles
from archivebox.personas.models import Persona
class PersonaAdmin(ConfigEditorMixin, BaseModelAdmin):
form = PersonaAdminForm
change_form_template = "admin/personas/persona/change_form.html"
list_display = ("name", "created_by", "created_at", "chrome_profile_state", "cookies_state", "auth_state")
search_fields = ("name", "created_by__username")
list_filter = ("created_by",)
ordering = ["name"]
list_per_page = 100
readonly_fields = ("id", "created_at", "persona_paths", "import_artifact_status")
add_fieldsets = (
("Persona", {
"fields": ("name", "created_by"),
"classes": ("card",),
}),
("Browser Import", {
"fields": (
"import_mode",
"import_discovered_profile",
"import_source",
"import_profile_name",
"import_copy_profile",
"import_extract_cookies",
"import_capture_storage",
),
"classes": ("card", "wide"),
}),
("Advanced", {
"fields": ("config",),
"classes": ("card", "wide"),
}),
)
change_fieldsets = add_fieldsets + (
("Artifacts", {
"fields": ("persona_paths", "import_artifact_status"),
"classes": ("card", "wide"),
}),
("Timestamps", {
"fields": ("id", "created_at"),
"classes": ("card",),
}),
)
@admin.display(description="Chrome Profile")
def chrome_profile_state(self, obj: Persona) -> str:
return "yes" if (obj.path / "chrome_user_data").exists() else "no"
@admin.display(description="cookies.txt")
def cookies_state(self, obj: Persona) -> str:
return "yes" if obj.COOKIES_FILE else "no"
@admin.display(description="auth.json")
def auth_state(self, obj: Persona) -> str:
return "yes" if obj.AUTH_STORAGE_FILE else "no"
@admin.display(description="Persona Paths")
def persona_paths(self, obj: Persona) -> str:
return format_html(
"<div class='abx-persona-path-list'>"
"<div><strong>Persona root</strong><code>{}</code></div>"
"<div><strong>chrome_user_data</strong><code>{}</code></div>"
"<div><strong>chrome_extensions</strong><code>{}</code></div>"
"<div><strong>chrome_downloads</strong><code>{}</code></div>"
"<div><strong>cookies.txt</strong><code>{}</code></div>"
"<div><strong>auth.json</strong><code>{}</code></div>"
"</div>",
obj.path,
obj.CHROME_USER_DATA_DIR,
obj.CHROME_EXTENSIONS_DIR,
obj.CHROME_DOWNLOADS_DIR,
obj.COOKIES_FILE or (obj.path / "cookies.txt"),
obj.AUTH_STORAGE_FILE or (obj.path / "auth.json"),
)
@admin.display(description="Import Artifacts")
def import_artifact_status(self, obj: Persona) -> str:
entries = [
("Browser profile", (obj.path / "chrome_user_data").exists(), obj.CHROME_USER_DATA_DIR),
("cookies.txt", bool(obj.COOKIES_FILE), obj.COOKIES_FILE or (obj.path / "cookies.txt")),
("auth.json", bool(obj.AUTH_STORAGE_FILE), obj.AUTH_STORAGE_FILE or (obj.path / "auth.json")),
]
return format_html(
"<div class='abx-persona-artifacts'>{}</div>",
format_html_join(
"",
"<div class='abx-persona-artifact'><strong>{}</strong><span class='{}'>{}</span><code>{}</code></div>",
(
(
label,
"abx-artifact-state abx-artifact-state--yes" if enabled else "abx-artifact-state abx-artifact-state--no",
"present" if enabled else "missing",
path,
)
for label, enabled, path in entries
),
),
)
def get_fieldsets(self, request, obj=None):
return self.change_fieldsets if obj else self.add_fieldsets
def render_change_form(self, request, context, add=False, change=False, form_url="", obj=None):
context["detected_profile_count"] = len(discover_local_browser_profiles())
return super().render_change_form(request, context, add=add, change=change, form_url=form_url, obj=obj)
def save_model(self, request, obj, form, change):
old_path = None
new_path = None
if change:
previous = Persona.objects.get(pk=obj.pk)
if previous.name != obj.name:
old_path = previous.path
new_path = obj.path
super().save_model(request, obj, form, change)
if old_path and new_path and old_path != new_path and old_path.exists():
if new_path.exists():
raise FileExistsError(f"Cannot rename Persona directory because the destination already exists: {new_path}")
shutil.move(str(old_path), str(new_path))
obj.ensure_dirs()
import_result = form.apply_import(obj)
if import_result is None:
return
completed_actions = []
if import_result.profile_copied:
completed_actions.append("profile copied")
if import_result.cookies_imported:
completed_actions.append("cookies.txt generated")
if import_result.storage_captured:
completed_actions.append("auth.json captured")
if import_result.user_agent_imported:
completed_actions.append("USER_AGENT copied")
if completed_actions:
messages.success(
request,
f'Imported {", ".join(completed_actions)} from {import_result.source.display_label}.',
)
else:
messages.warning(
request,
f"Persona saved, but no browser artifacts were imported from {import_result.source.display_label}.",
)
for warning in import_result.warnings:
messages.warning(request, warning)
def register_admin(admin_site: admin.AdminSite) -> None:
admin_site.register(Persona, PersonaAdmin)

View File

@@ -0,0 +1,210 @@
#!/usr/bin/env node
/**
* Export cookies and open-tab storage from a Chromium profile or live CDP URL.
*
* Environment variables:
* ARCHIVEBOX_ABX_PLUGINS_DIR Absolute path to abx_plugins/plugins
* CHROME_USER_DATA_DIR Local Chromium user-data directory to launch
* CHROME_CDP_URL Existing browser CDP URL to attach to
* COOKIES_OUTPUT_FILE Optional output path for Netscape cookies.txt
* AUTH_STORAGE_OUTPUT_FILE Optional output path for auth.json
* CHROME_BINARY Optional browser binary override
* NODE_MODULES_DIR Optional node_modules path for puppeteer-core
*/
const fs = require('fs');
const os = require('os');
const path = require('path');
const pluginsDir = process.env.ARCHIVEBOX_ABX_PLUGINS_DIR || process.env.ABX_PLUGINS_DIR;
if (!pluginsDir) {
console.error('ARCHIVEBOX_ABX_PLUGINS_DIR is required');
process.exit(1);
}
const baseUtils = require(path.join(pluginsDir, 'base', 'utils.js'));
baseUtils.ensureNodeModuleResolution(module);
const chromeUtils = require(path.join(pluginsDir, 'chrome', 'chrome_utils.js'));
const puppeteer = require('puppeteer-core');
function cookieToNetscape(cookie) {
let domain = cookie.domain;
if (!domain.startsWith('.') && !cookie.hostOnly) {
domain = '.' + domain;
}
const includeSubdomains = domain.startsWith('.') ? 'TRUE' : 'FALSE';
const cookiePath = cookie.path || '/';
const secure = cookie.secure ? 'TRUE' : 'FALSE';
const expiry = cookie.expires && cookie.expires > 0 ? Math.floor(cookie.expires).toString() : '0';
return `${domain}\t${includeSubdomains}\t${cookiePath}\t${secure}\t${expiry}\t${cookie.name}\t${cookie.value}`;
}
function writeCookiesFile(cookies, outputPath) {
const lines = [
'# Netscape HTTP Cookie File',
'# https://curl.se/docs/http-cookies.html',
'# This file was generated by ArchiveBox persona cookie extraction',
'#',
'# Format: domain\\tincludeSubdomains\\tpath\\tsecure\\texpiry\\tname\\tvalue',
'',
];
for (const cookie of cookies) {
lines.push(cookieToNetscape(cookie));
}
fs.mkdirSync(path.dirname(outputPath), { recursive: true });
fs.writeFileSync(outputPath, lines.join('\n') + '\n');
}
async function collectStorage(browser) {
const localStorage = {};
const sessionStorage = {};
const pages = await browser.pages();
for (const page of pages) {
try {
const url = page.url();
if (!url || url === 'about:blank') continue;
if (url.startsWith('chrome:') || url.startsWith('edge:') || url.startsWith('devtools:')) continue;
const payload = await page.evaluate(() => ({
origin: window.location.origin,
localStorage: Object.fromEntries(Object.entries(window.localStorage)),
sessionStorage: Object.fromEntries(Object.entries(window.sessionStorage)),
}));
if (!payload.origin || payload.origin === 'null') continue;
if (Object.keys(payload.localStorage || {}).length > 0) {
localStorage[payload.origin] = payload.localStorage;
}
if (Object.keys(payload.sessionStorage || {}).length > 0) {
sessionStorage[payload.origin] = payload.sessionStorage;
}
} catch (error) {
// Ignore pages that cannot be inspected via evaluate().
}
}
return { localStorage, sessionStorage };
}
async function openBrowser() {
const cdpUrl = process.env.CHROME_CDP_URL || '';
if (cdpUrl) {
const browser = await chromeUtils.connectToBrowserEndpoint(puppeteer, cdpUrl, { defaultViewport: null });
return {
browser,
async cleanup() {
try {
await browser.disconnect();
} catch (error) {}
},
sourceDescription: cdpUrl,
};
}
const userDataDir = process.env.CHROME_USER_DATA_DIR;
if (!userDataDir) {
throw new Error('Either CHROME_USER_DATA_DIR or CHROME_CDP_URL is required');
}
if (!fs.existsSync(userDataDir)) {
throw new Error(`User data directory does not exist: ${userDataDir}`);
}
const outputDir = fs.mkdtempSync(path.join(os.tmpdir(), 'abx-browser-state-'));
const binary = process.env.CHROME_BINARY || chromeUtils.findAnyChromiumBinary();
if (!binary) {
throw new Error('Could not find a Chromium binary for browser state export');
}
const launched = await chromeUtils.launchChromium({
binary,
outputDir,
userDataDir,
headless: true,
killZombies: false,
});
if (!launched.success) {
throw new Error(launched.error || 'Chrome launch failed');
}
const browser = await chromeUtils.connectToBrowserEndpoint(puppeteer, launched.cdpUrl, { defaultViewport: null });
return {
browser,
async cleanup() {
try {
await browser.disconnect();
} catch (error) {}
try {
await chromeUtils.killChrome(launched.pid, outputDir);
} catch (error) {}
try {
fs.rmSync(outputDir, { recursive: true, force: true });
} catch (error) {}
},
sourceDescription: userDataDir,
};
}
async function main() {
const cookiesOutput = process.env.COOKIES_OUTPUT_FILE || '';
const authOutput = process.env.AUTH_STORAGE_OUTPUT_FILE || '';
if (!cookiesOutput && !authOutput) {
throw new Error('COOKIES_OUTPUT_FILE or AUTH_STORAGE_OUTPUT_FILE is required');
}
const { browser, cleanup, sourceDescription } = await openBrowser();
try {
const session = await browser.target().createCDPSession();
const browserVersion = await session.send('Browser.getVersion');
const cookieResult = await session.send('Storage.getCookies');
const cookies = cookieResult?.cookies || [];
const { localStorage, sessionStorage } = await collectStorage(browser);
const userAgent = browserVersion?.userAgent || '';
if (cookiesOutput) {
writeCookiesFile(cookies, cookiesOutput);
}
if (authOutput) {
fs.mkdirSync(path.dirname(authOutput), { recursive: true });
fs.writeFileSync(
authOutput,
JSON.stringify(
{
TYPE: 'auth',
SOURCE: sourceDescription,
captured_at: new Date().toISOString(),
user_agent: userAgent,
cookies,
localStorage,
sessionStorage,
},
null,
2,
) + '\n',
);
}
console.error(
`[+] Exported ${cookies.length} cookies` +
`${authOutput ? ` and ${Object.keys(localStorage).length + Object.keys(sessionStorage).length} storage origins` : ''}` +
`${userAgent ? ' with browser USER_AGENT' : ''}` +
` from ${sourceDescription}`,
);
} finally {
await cleanup();
}
}
main().catch((error) => {
console.error(`ERROR: ${error.message}`);
process.exit(1);
});

View File

@@ -0,0 +1,176 @@
__package__ = "archivebox.personas"
from typing import Any
from django import forms
from django.utils.safestring import mark_safe
from archivebox.personas.importers import (
PersonaImportResult,
PersonaImportSource,
discover_local_browser_profiles,
import_persona_from_source,
resolve_custom_import_source,
validate_persona_name,
)
from archivebox.personas.models import Persona
def _mode_label(title: str, description: str) -> str:
return mark_safe(
f'<span class="abx-import-mode-option"><strong>{title}</strong><span>{description}</span></span>'
)
class PersonaAdminForm(forms.ModelForm):
import_mode = forms.ChoiceField(
required=False,
initial="none",
label="Bootstrap this persona",
widget=forms.RadioSelect,
choices=(
("none", _mode_label("Blank Persona", "Create the persona without importing browser state yet.")),
("discovered", _mode_label("Use a detected profile", "Pick from Chromium profiles auto-discovered on this host.")),
("custom", _mode_label("Use a custom path or CDP URL", "Paste an absolute Chromium path or attach to a live browser debugging endpoint.")),
),
help_text="These options run after the Persona row is saved, using the same backend import helpers as the CLI.",
)
import_discovered_profile = forms.ChoiceField(
required=False,
label="Autodiscovered profiles",
widget=forms.RadioSelect,
choices=(),
help_text="Detected from local Chrome, Chromium, Brave, and Edge profile roots.",
)
import_source = forms.CharField(
required=False,
label="Absolute path or CDP URL",
widget=forms.TextInput(
attrs={
"placeholder": "/Users/alice/Library/Application Support/Google/Chrome or ws://127.0.0.1:9222/devtools/browser/...",
"style": "width: 100%; font-family: monospace;",
}
),
help_text="Accepts an absolute Chromium user-data dir, an exact profile dir, or a live HTTP/WS CDP endpoint.",
)
import_profile_name = forms.CharField(
required=False,
label="Profile directory name",
widget=forms.TextInput(
attrs={
"placeholder": "Default or Profile 1",
"style": "width: 100%; font-family: monospace;",
}
),
help_text="Only used when the custom path points at a browser root containing multiple profiles.",
)
import_copy_profile = forms.BooleanField(
required=False,
initial=True,
label="Copy browser profile into this persona",
help_text="Copies the chosen Chromium user-data tree into `chrome_user_data` for future archiving runs.",
)
import_extract_cookies = forms.BooleanField(
required=False,
initial=True,
label="Generate `cookies.txt`",
help_text="Extracts cookies through Chrome DevTools Protocol and writes a Netscape cookie jar for wget/curl-based plugins.",
)
import_capture_storage = forms.BooleanField(
required=False,
initial=True,
label="Capture open-tab storage into `auth.json`",
help_text="Snapshots currently open tab `localStorage` / `sessionStorage` values by origin. This is most useful for live CDP imports.",
)
class Meta:
model = Persona
fields = ("name", "created_by", "config")
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.discovered_profiles = discover_local_browser_profiles()
self._resolved_import_source: PersonaImportSource | None = None
self.fields["import_mode"].widget.attrs["class"] = "abx-import-mode"
self.fields["import_discovered_profile"].widget.attrs["class"] = "abx-profile-picker"
if self.discovered_profiles:
self.fields["import_discovered_profile"].choices = [
(profile.choice_value, profile.as_choice_label()) for profile in self.discovered_profiles
]
else:
self.fields["import_discovered_profile"].choices = []
self.fields["import_discovered_profile"].help_text = (
"No local Chromium profiles were detected on this host right now. "
"Use the custom path/CDP option if the browser data lives elsewhere."
)
def clean_name(self) -> str:
name = str(self.cleaned_data.get("name") or "").strip()
is_valid, error_message = validate_persona_name(name)
if not is_valid:
raise forms.ValidationError(error_message)
return name
def clean(self) -> dict[str, Any]:
cleaned_data = super().clean()
self._resolved_import_source = None
import_mode = str(cleaned_data.get("import_mode") or "none").strip() or "none"
if import_mode == "none":
return cleaned_data
if import_mode == "discovered":
selection = str(cleaned_data.get("import_discovered_profile") or "").strip()
if not selection:
self.add_error("import_discovered_profile", "Choose one of the discovered profiles to import.")
return cleaned_data
try:
self._resolved_import_source = PersonaImportSource.from_choice_value(selection)
except ValueError as err:
self.add_error("import_discovered_profile", str(err))
return cleaned_data
elif import_mode == "custom":
raw_value = str(cleaned_data.get("import_source") or "").strip()
if not raw_value:
self.add_error("import_source", "Provide an absolute Chromium profile path or a CDP URL.")
return cleaned_data
try:
self._resolved_import_source = resolve_custom_import_source(
raw_value,
profile_dir=str(cleaned_data.get("import_profile_name") or "").strip() or None,
)
except ValueError as err:
self.add_error("import_source", str(err))
return cleaned_data
else:
self.add_error("import_mode", "Choose how this Persona should be bootstrapped.")
return cleaned_data
copy_profile = bool(cleaned_data.get("import_copy_profile"))
import_cookies = bool(cleaned_data.get("import_extract_cookies"))
capture_storage = bool(cleaned_data.get("import_capture_storage"))
if self._resolved_import_source.kind == "cdp":
if not (import_cookies or capture_storage):
self.add_error(
"import_extract_cookies",
"CDP imports can only capture cookies and/or open-tab storage. Profile copying is not available for a remote browser endpoint.",
)
elif not (copy_profile or import_cookies or capture_storage):
raise forms.ValidationError("Select at least one import action.")
return cleaned_data
def apply_import(self, persona: Persona) -> PersonaImportResult | None:
if not self._resolved_import_source:
return None
return import_persona_from_source(
persona,
self._resolved_import_source,
copy_profile=bool(self.cleaned_data.get("import_copy_profile")),
import_cookies=bool(self.cleaned_data.get("import_extract_cookies")),
capture_storage=bool(self.cleaned_data.get("import_capture_storage")),
)

View File

@@ -0,0 +1,845 @@
"""
Shared persona browser discovery/import helpers.
These helpers are used by both the CLI and the Django admin so Persona import
behavior stays consistent regardless of where it is triggered from.
"""
from __future__ import annotations
import json
import os
import platform
import shutil
import subprocess
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Optional
from urllib.parse import urlparse
from django.utils.html import format_html
from django.utils.safestring import SafeString
if TYPE_CHECKING:
from archivebox.personas.models import Persona
BROWSER_LABELS = {
"chrome": "Google Chrome",
"chromium": "Chromium",
"brave": "Brave",
"edge": "Microsoft Edge",
"custom": "Custom Path",
"persona": "Persona Template",
}
BROWSER_PROFILE_DIR_NAMES = (
"Default",
"Profile ",
"Guest Profile",
)
VOLATILE_PROFILE_COPY_PATTERNS = (
"Cache",
"Code Cache",
"GPUCache",
"ShaderCache",
"Service Worker",
"GCM Store",
"*.log",
"Crashpad",
"BrowserMetrics",
"BrowserMetrics-spare.pma",
"SingletonLock",
"SingletonSocket",
"SingletonCookie",
)
PERSONA_PROFILE_DIR_CANDIDATES = (
"chrome_profile",
"chrome_user_data",
)
@dataclass(frozen=True)
class PersonaImportSource:
kind: str
browser: str = "custom"
source_name: str | None = None
user_data_dir: Path | None = None
profile_dir: str | None = None
browser_binary: str | None = None
cdp_url: str | None = None
@property
def browser_label(self) -> str:
return BROWSER_LABELS.get(self.browser, self.browser.title())
@property
def profile_path(self) -> Path | None:
if not self.user_data_dir or not self.profile_dir:
return None
return self.user_data_dir / self.profile_dir
@property
def display_label(self) -> str:
if self.kind == "cdp":
return self.cdp_url or "CDP URL"
profile_suffix = f" / {self.profile_dir}" if self.profile_dir else ""
source_prefix = f": {self.source_name}" if self.source_name else ""
return f"{self.browser_label}{source_prefix}{profile_suffix}"
@property
def choice_value(self) -> str:
return json.dumps(
{
"kind": self.kind,
"browser": self.browser,
"source_name": self.source_name or "",
"user_data_dir": str(self.user_data_dir) if self.user_data_dir else "",
"profile_dir": self.profile_dir or "",
"browser_binary": self.browser_binary or "",
"cdp_url": self.cdp_url or "",
},
sort_keys=True,
)
def as_choice_label(self) -> SafeString:
path_str = str(self.profile_path or self.user_data_dir or self.cdp_url or "")
binary_suffix = f"Using {self.browser_binary}" if self.browser_binary else "Will auto-detect a Chromium binary"
return format_html(
'<span class="abx-profile-option">'
'<strong>{}</strong>'
'<span class="abx-profile-option__meta">{}</span>'
'<code>{}</code>'
"</span>",
self.display_label,
binary_suffix,
path_str,
)
@classmethod
def from_choice_value(cls, value: str) -> "PersonaImportSource":
try:
payload = json.loads(value)
except json.JSONDecodeError as err:
raise ValueError("Invalid discovered profile selection.") from err
if payload.get("kind") != "browser-profile":
raise ValueError("Invalid discovered profile selection.")
user_data_dir = Path(str(payload.get("user_data_dir") or "")).expanduser()
profile_dir = str(payload.get("profile_dir") or "").strip()
browser = str(payload.get("browser") or "custom").strip().lower() or "custom"
source_name = str(payload.get("source_name") or "").strip() or None
browser_binary = str(payload.get("browser_binary") or "").strip() or None
return resolve_browser_profile_source(
browser=browser,
source_name=source_name,
user_data_dir=user_data_dir,
profile_dir=profile_dir,
browser_binary=browser_binary,
)
@dataclass
class PersonaImportResult:
source: PersonaImportSource
profile_copied: bool = False
cookies_imported: bool = False
storage_captured: bool = False
user_agent_imported: bool = False
warnings: list[str] = field(default_factory=list)
@property
def did_work(self) -> bool:
return self.profile_copied or self.cookies_imported or self.storage_captured or self.user_agent_imported
def get_chrome_user_data_dir() -> Optional[Path]:
"""Get the default Chrome user data directory for the current platform."""
system = platform.system()
home = Path.home()
if system == "Darwin":
candidates = [
home / "Library" / "Application Support" / "Google" / "Chrome",
home / "Library" / "Application Support" / "Chromium",
]
elif system == "Linux":
candidates = [
home / ".config" / "google-chrome",
home / ".config" / "chromium",
home / ".config" / "chrome",
home / "snap" / "chromium" / "common" / "chromium",
]
elif system == "Windows":
local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local"))
candidates = [
local_app_data / "Google" / "Chrome" / "User Data",
local_app_data / "Chromium" / "User Data",
]
else:
candidates = []
for candidate in candidates:
if candidate.exists() and _list_profile_names(candidate):
return candidate
return None
def get_brave_user_data_dir() -> Optional[Path]:
"""Get the default Brave user data directory for the current platform."""
system = platform.system()
home = Path.home()
if system == "Darwin":
candidates = [
home / "Library" / "Application Support" / "BraveSoftware" / "Brave-Browser",
]
elif system == "Linux":
candidates = [
home / ".config" / "BraveSoftware" / "Brave-Browser",
]
elif system == "Windows":
local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local"))
candidates = [
local_app_data / "BraveSoftware" / "Brave-Browser" / "User Data",
]
else:
candidates = []
for candidate in candidates:
if candidate.exists() and _list_profile_names(candidate):
return candidate
return None
def get_edge_user_data_dir() -> Optional[Path]:
"""Get the default Edge user data directory for the current platform."""
system = platform.system()
home = Path.home()
if system == "Darwin":
candidates = [
home / "Library" / "Application Support" / "Microsoft Edge",
]
elif system == "Linux":
candidates = [
home / ".config" / "microsoft-edge",
home / ".config" / "microsoft-edge-beta",
home / ".config" / "microsoft-edge-dev",
]
elif system == "Windows":
local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local"))
candidates = [
local_app_data / "Microsoft" / "Edge" / "User Data",
]
else:
candidates = []
for candidate in candidates:
if candidate.exists() and _list_profile_names(candidate):
return candidate
return None
def get_browser_binary(browser: str) -> Optional[str]:
system = platform.system()
home = Path.home()
browser = browser.lower()
if system == "Darwin":
candidates = {
"chrome": ["/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"],
"chromium": ["/Applications/Chromium.app/Contents/MacOS/Chromium"],
"brave": ["/Applications/Brave Browser.app/Contents/MacOS/Brave Browser"],
"edge": ["/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge"],
}.get(browser, [])
elif system == "Linux":
candidates = {
"chrome": ["/usr/bin/google-chrome", "/usr/bin/google-chrome-stable", "/usr/bin/google-chrome-beta", "/usr/bin/google-chrome-unstable"],
"chromium": ["/usr/bin/chromium", "/usr/bin/chromium-browser"],
"brave": ["/usr/bin/brave-browser", "/usr/bin/brave-browser-beta", "/usr/bin/brave-browser-nightly"],
"edge": ["/usr/bin/microsoft-edge", "/usr/bin/microsoft-edge-stable", "/usr/bin/microsoft-edge-beta", "/usr/bin/microsoft-edge-dev"],
}.get(browser, [])
elif system == "Windows":
local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local"))
candidates = {
"chrome": [
str(local_app_data / "Google" / "Chrome" / "Application" / "chrome.exe"),
"C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe",
"C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe",
],
"chromium": [str(local_app_data / "Chromium" / "Application" / "chrome.exe")],
"brave": [
str(local_app_data / "BraveSoftware" / "Brave-Browser" / "Application" / "brave.exe"),
"C:\\Program Files\\BraveSoftware\\Brave-Browser\\Application\\brave.exe",
"C:\\Program Files (x86)\\BraveSoftware\\Brave-Browser\\Application\\brave.exe",
],
"edge": [
str(local_app_data / "Microsoft" / "Edge" / "Application" / "msedge.exe"),
"C:\\Program Files\\Microsoft\\Edge\\Application\\msedge.exe",
"C:\\Program Files (x86)\\Microsoft\\Edge\\Application\\msedge.exe",
],
}.get(browser, [])
else:
candidates = []
for candidate in candidates:
if candidate and Path(candidate).exists():
return candidate
return None
BROWSER_PROFILE_FINDERS = {
"chrome": get_chrome_user_data_dir,
"chromium": get_chrome_user_data_dir,
"brave": get_brave_user_data_dir,
"edge": get_edge_user_data_dir,
}
CHROMIUM_BROWSERS = tuple(BROWSER_PROFILE_FINDERS.keys())
NETSCAPE_COOKIE_HEADER = [
"# Netscape HTTP Cookie File",
"# https://curl.se/docs/http-cookies.html",
"# This file was generated by ArchiveBox persona cookie extraction",
"#",
"# Format: domain\\tincludeSubdomains\\tpath\\tsecure\\texpiry\\tname\\tvalue",
"",
]
def validate_persona_name(name: str) -> tuple[bool, str]:
"""Validate persona name to prevent path traversal."""
if not name or not name.strip():
return False, "Persona name cannot be empty"
if "/" in name or "\\" in name:
return False, "Persona name cannot contain path separators (/ or \\)"
if ".." in name:
return False, "Persona name cannot contain parent directory references (..)"
if name.startswith("."):
return False, "Persona name cannot start with a dot (.)"
if "\x00" in name or "\n" in name or "\r" in name:
return False, "Persona name contains invalid characters"
return True, ""
def discover_local_browser_profiles() -> list[PersonaImportSource]:
discovered: list[PersonaImportSource] = []
for browser, finder in BROWSER_PROFILE_FINDERS.items():
user_data_dir = finder()
if not user_data_dir:
continue
browser_binary = get_browser_binary(browser)
for profile_dir in _list_profile_names(user_data_dir):
try:
discovered.append(
resolve_browser_profile_source(
browser=browser,
user_data_dir=user_data_dir,
profile_dir=profile_dir,
browser_binary=browser_binary,
)
)
except ValueError:
continue
discovered.extend(discover_persona_template_profiles())
return discovered
def discover_persona_template_profiles(personas_dir: Path | None = None) -> list[PersonaImportSource]:
from archivebox.config.constants import CONSTANTS
templates: list[PersonaImportSource] = []
candidate_roots: list[Path] = []
if personas_dir is not None:
candidate_roots.append(personas_dir.expanduser())
else:
candidate_roots.extend(
[
CONSTANTS.PERSONAS_DIR.expanduser(),
Path.home() / ".config" / "abx" / "personas",
]
)
seen_roots: set[Path] = set()
for personas_root in candidate_roots:
resolved_root = personas_root.resolve()
if resolved_root in seen_roots:
continue
seen_roots.add(resolved_root)
if not resolved_root.exists() or not resolved_root.is_dir():
continue
for persona_dir in sorted((path for path in resolved_root.iterdir() if path.is_dir()), key=lambda path: path.name.lower()):
for candidate_dir_name in PERSONA_PROFILE_DIR_CANDIDATES:
user_data_dir = persona_dir / candidate_dir_name
if not user_data_dir.exists() or not user_data_dir.is_dir():
continue
for profile_dir in _list_profile_names(user_data_dir):
try:
templates.append(
resolve_browser_profile_source(
browser="persona",
source_name=persona_dir.name,
user_data_dir=user_data_dir,
profile_dir=profile_dir,
browser_binary=get_browser_binary("chrome"),
)
)
except ValueError:
continue
return templates
def resolve_browser_import_source(browser: str, profile_dir: str | None = None) -> PersonaImportSource:
browser = browser.lower().strip()
if browser not in BROWSER_PROFILE_FINDERS:
supported = ", ".join(BROWSER_PROFILE_FINDERS)
raise ValueError(f"Unknown browser: {browser}. Supported browsers: {supported}")
user_data_dir = BROWSER_PROFILE_FINDERS[browser]()
if not user_data_dir:
raise ValueError(f"Could not find {browser} profile directory")
chosen_profile = profile_dir or pick_default_profile_dir(user_data_dir)
if not chosen_profile:
raise ValueError(f"Could not find a profile in {user_data_dir}")
return resolve_browser_profile_source(
browser=browser,
user_data_dir=user_data_dir,
profile_dir=chosen_profile,
browser_binary=get_browser_binary(browser),
)
def resolve_browser_profile_source(
browser: str,
user_data_dir: Path,
profile_dir: str,
source_name: str | None = None,
browser_binary: str | None = None,
) -> PersonaImportSource:
resolved_root = user_data_dir.expanduser()
if not resolved_root.is_absolute():
resolved_root = resolved_root.resolve()
if not resolved_root.exists():
raise ValueError(f"Profile root does not exist: {resolved_root}")
if not profile_dir.strip():
raise ValueError("Profile directory name cannot be empty.")
profile_path = resolved_root / profile_dir
if not _looks_like_profile_dir(profile_path):
raise ValueError(f"Profile directory does not look valid: {profile_path}")
return PersonaImportSource(
kind="browser-profile",
browser=browser,
source_name=source_name,
user_data_dir=resolved_root,
profile_dir=profile_dir,
browser_binary=browser_binary,
)
def resolve_custom_import_source(raw_value: str, profile_dir: str | None = None) -> PersonaImportSource:
raw_value = raw_value.strip()
if not raw_value:
raise ValueError("Provide an absolute browser profile path or a CDP URL.")
if _looks_like_cdp_url(raw_value):
return PersonaImportSource(kind="cdp", cdp_url=raw_value)
source_path = Path(raw_value).expanduser()
if not source_path.is_absolute():
raise ValueError("Custom browser path must be an absolute path.")
if not source_path.exists():
raise ValueError(f"Custom browser path does not exist: {source_path}")
explicit_profile = profile_dir.strip() if profile_dir else ""
if _looks_like_profile_dir(source_path):
if explicit_profile and explicit_profile != source_path.name:
raise ValueError("Profile name does not match the provided profile directory path.")
return resolve_browser_profile_source(
browser="custom",
user_data_dir=source_path.parent.resolve(),
profile_dir=source_path.name,
)
chosen_profile = explicit_profile or pick_default_profile_dir(source_path)
if not chosen_profile:
raise ValueError(
"Could not find a Chromium profile in that directory. "
"Provide an exact profile directory path or fill in the profile name field."
)
return resolve_browser_profile_source(
browser="custom",
user_data_dir=source_path.resolve(),
profile_dir=chosen_profile,
)
def pick_default_profile_dir(user_data_dir: Path) -> str | None:
profiles = _list_profile_names(user_data_dir)
if not profiles:
return None
if "Default" in profiles:
return "Default"
return profiles[0]
def import_persona_from_source(
persona: "Persona",
source: PersonaImportSource,
*,
copy_profile: bool = True,
import_cookies: bool = True,
capture_storage: bool = False,
) -> PersonaImportResult:
persona.ensure_dirs()
result = PersonaImportResult(source=source)
persona_chrome_dir = Path(persona.CHROME_USER_DATA_DIR)
cookies_file = persona.path / "cookies.txt"
auth_file = persona.path / "auth.json"
launch_user_data_dir: Path | None = None
if source.kind == "browser-profile":
if copy_profile and source.user_data_dir:
resolved_source_root = source.user_data_dir.resolve()
resolved_persona_root = persona_chrome_dir.resolve()
if resolved_source_root == resolved_persona_root:
result.warnings.append("Skipped profile copy because the selected source is already this persona's chrome_user_data directory.")
else:
copy_browser_user_data_dir(resolved_source_root, resolved_persona_root)
persona.cleanup_chrome_profile(resolved_persona_root)
result.profile_copied = True
launch_user_data_dir = resolved_persona_root
else:
launch_user_data_dir = source.user_data_dir
elif copy_profile:
result.warnings.append("Profile copying is only available for local Chromium profile paths. CDP imports can only pull cookies and open-tab storage.")
if source.kind == "cdp":
export_success, auth_payload, export_message = export_browser_state(
cdp_url=source.cdp_url,
cookies_output_file=cookies_file if import_cookies else None,
auth_output_file=auth_file if capture_storage else None,
)
else:
export_success, auth_payload, export_message = export_browser_state(
user_data_dir=launch_user_data_dir,
profile_dir=source.profile_dir,
chrome_binary=source.browser_binary,
cookies_output_file=cookies_file if import_cookies else None,
auth_output_file=auth_file if capture_storage else None,
)
if not export_success:
result.warnings.append(export_message or "Browser import failed.")
return result
if import_cookies and cookies_file.exists():
result.cookies_imported = True
if capture_storage and auth_file.exists():
result.storage_captured = True
if _apply_imported_user_agent(persona, auth_payload):
result.user_agent_imported = True
return result
def copy_browser_user_data_dir(source_dir: Path, destination_dir: Path) -> None:
destination_dir.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(destination_dir, ignore_errors=True)
shutil.copytree(
source_dir,
destination_dir,
symlinks=True,
ignore=shutil.ignore_patterns(*VOLATILE_PROFILE_COPY_PATTERNS),
)
def export_browser_state(
*,
user_data_dir: Path | None = None,
cdp_url: str | None = None,
profile_dir: str | None = None,
chrome_binary: str | None = None,
cookies_output_file: Path | None = None,
auth_output_file: Path | None = None,
) -> tuple[bool, dict | None, str]:
if not user_data_dir and not cdp_url:
return False, None, "Missing browser source."
from abx_plugins import get_plugins_dir
from archivebox.config.common import STORAGE_CONFIG
state_script = Path(__file__).with_name("export_browser_state.js")
if not state_script.exists():
return False, None, f"Browser state export script not found at {state_script}"
node_modules_dir = STORAGE_CONFIG.LIB_DIR / "npm" / "node_modules"
chrome_plugin_dir = Path(get_plugins_dir()).resolve()
env = os.environ.copy()
env["NODE_MODULES_DIR"] = str(node_modules_dir)
env["ARCHIVEBOX_ABX_PLUGINS_DIR"] = str(chrome_plugin_dir)
if user_data_dir:
env["CHROME_USER_DATA_DIR"] = str(user_data_dir)
if cdp_url:
env["CHROME_CDP_URL"] = cdp_url
env["CHROME_IS_LOCAL"] = "false"
if chrome_binary:
env["CHROME_BINARY"] = str(chrome_binary)
if profile_dir:
extra_arg = f"--profile-directory={profile_dir}"
existing_extra = env.get("CHROME_ARGS_EXTRA", "").strip()
args_list: list[str] = []
if existing_extra:
if existing_extra.startswith("["):
try:
parsed = json.loads(existing_extra)
if isinstance(parsed, list):
args_list.extend(str(x) for x in parsed)
except Exception:
args_list.extend([s.strip() for s in existing_extra.split(",") if s.strip()])
else:
args_list.extend([s.strip() for s in existing_extra.split(",") if s.strip()])
args_list.append(extra_arg)
env["CHROME_ARGS_EXTRA"] = json.dumps(args_list)
temp_dir: Path | None = None
tmp_cookies_file: Path | None = None
tmp_auth_file: Path | None = None
if cookies_output_file and cookies_output_file.exists():
temp_dir = Path(tempfile.mkdtemp(prefix="ab_browser_state_"))
tmp_cookies_file = temp_dir / "cookies.txt"
env["COOKIES_OUTPUT_FILE"] = str(tmp_cookies_file)
elif cookies_output_file:
env["COOKIES_OUTPUT_FILE"] = str(cookies_output_file)
if auth_output_file and auth_output_file.exists():
temp_dir = temp_dir or Path(tempfile.mkdtemp(prefix="ab_browser_state_"))
tmp_auth_file = temp_dir / "auth.json"
env["AUTH_STORAGE_OUTPUT_FILE"] = str(tmp_auth_file)
elif auth_output_file:
env["AUTH_STORAGE_OUTPUT_FILE"] = str(auth_output_file)
else:
temp_dir = temp_dir or Path(tempfile.mkdtemp(prefix="ab_browser_state_"))
tmp_auth_file = temp_dir / "auth.json"
env["AUTH_STORAGE_OUTPUT_FILE"] = str(tmp_auth_file)
try:
result = subprocess.run(
["node", str(state_script)],
env=env,
capture_output=True,
text=True,
timeout=120,
)
except subprocess.TimeoutExpired:
return False, None, "Browser state export timed out."
except FileNotFoundError:
return False, None, "Node.js was not found, so ArchiveBox could not extract browser state."
except Exception as err:
return False, None, f"Browser state export failed: {err}"
if result.returncode != 0:
message = (result.stderr or result.stdout or "").strip() or "Browser state export failed."
return False, None, message
auth_payload: dict | None = None
if cookies_output_file and tmp_cookies_file and tmp_cookies_file.exists():
_merge_netscape_cookies(cookies_output_file, tmp_cookies_file)
if auth_output_file and tmp_auth_file and tmp_auth_file.exists():
_merge_auth_storage(auth_output_file, tmp_auth_file)
auth_payload = _load_auth_storage(tmp_auth_file)
elif auth_output_file and auth_output_file.exists():
auth_payload = _load_auth_storage(auth_output_file)
elif tmp_auth_file and tmp_auth_file.exists():
auth_payload = _load_auth_storage(tmp_auth_file)
if temp_dir and temp_dir.exists():
shutil.rmtree(temp_dir, ignore_errors=True)
return True, auth_payload, (result.stderr or result.stdout or "").strip()
def _list_profile_names(user_data_dir: Path) -> list[str]:
if not user_data_dir.exists() or not user_data_dir.is_dir():
return []
profiles: list[str] = []
for child in sorted(user_data_dir.iterdir(), key=lambda path: path.name.lower()):
if not child.is_dir():
continue
if child.name == "System Profile":
continue
if child.name == "Default" or child.name.startswith("Profile ") or child.name.startswith("Guest Profile"):
if _looks_like_profile_dir(child):
profiles.append(child.name)
continue
if _looks_like_profile_dir(child):
profiles.append(child.name)
return profiles
def _looks_like_profile_dir(path: Path) -> bool:
if not path.exists() or not path.is_dir():
return False
marker_paths = (
path / "Preferences",
path / "History",
path / "Cookies",
path / "Network" / "Cookies",
path / "Local Storage",
path / "Session Storage",
)
if any(marker.exists() for marker in marker_paths):
return True
return any(path.name == prefix or path.name.startswith(prefix) for prefix in BROWSER_PROFILE_DIR_NAMES)
def _looks_like_cdp_url(value: str) -> bool:
parsed = urlparse(value)
return parsed.scheme in {"ws", "wss", "http", "https"} and bool(parsed.netloc)
def _parse_netscape_cookies(path: Path) -> dict[tuple[str, str, str], tuple[str, str, str, str, str, str, str]]:
cookies: dict[tuple[str, str, str], tuple[str, str, str, str, str, str, str]] = {}
if not path.exists():
return cookies
for line in path.read_text().splitlines():
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 7:
continue
domain, include_subdomains, cookie_path, secure, expiry, name, value = parts[:7]
cookies[(domain, cookie_path, name)] = (domain, include_subdomains, cookie_path, secure, expiry, name, value)
return cookies
def _write_netscape_cookies(
path: Path,
cookies: dict[tuple[str, str, str], tuple[str, str, str, str, str, str, str]],
) -> None:
lines = list(NETSCAPE_COOKIE_HEADER)
for cookie in cookies.values():
lines.append("\t".join(cookie))
path.write_text("\n".join(lines) + "\n")
def _merge_netscape_cookies(existing_file: Path, new_file: Path) -> None:
existing = _parse_netscape_cookies(existing_file)
new = _parse_netscape_cookies(new_file)
existing.update(new)
_write_netscape_cookies(existing_file, existing)
def _merge_auth_storage(existing_file: Path, new_file: Path) -> None:
existing_payload = _load_auth_storage(existing_file)
new_payload = _load_auth_storage(new_file)
existing_local = existing_payload.setdefault("localStorage", {})
existing_session = existing_payload.setdefault("sessionStorage", {})
for origin, payload in (new_payload.get("localStorage") or {}).items():
existing_local[origin] = payload
for origin, payload in (new_payload.get("sessionStorage") or {}).items():
existing_session[origin] = payload
cookies = _merge_cookie_dicts(existing_payload.get("cookies") or [], new_payload.get("cookies") or [])
merged = {
**existing_payload,
**new_payload,
"cookies": cookies,
"localStorage": existing_local,
"sessionStorage": existing_session,
"user_agent": new_payload.get("user_agent") or existing_payload.get("user_agent") or "",
}
existing_file.write_text(json.dumps(merged, indent=2, sort_keys=True) + "\n")
def _load_auth_storage(path: Path) -> dict:
if not path.exists():
return {
"TYPE": "auth",
"cookies": [],
"localStorage": {},
"sessionStorage": {},
}
try:
payload = json.loads(path.read_text())
except json.JSONDecodeError:
return {
"TYPE": "auth",
"cookies": [],
"localStorage": {},
"sessionStorage": {},
}
if not isinstance(payload, dict):
return {
"TYPE": "auth",
"cookies": [],
"localStorage": {},
"sessionStorage": {},
}
return payload
def _merge_cookie_dicts(existing: list[dict], new: list[dict]) -> list[dict]:
merged: dict[tuple[str, str, str], dict] = {}
for cookie in existing:
key = (str(cookie.get("domain") or ""), str(cookie.get("path") or "/"), str(cookie.get("name") or ""))
merged[key] = cookie
for cookie in new:
key = (str(cookie.get("domain") or ""), str(cookie.get("path") or "/"), str(cookie.get("name") or ""))
merged[key] = cookie
return list(merged.values())
def _apply_imported_user_agent(persona: "Persona", auth_payload: dict | None) -> bool:
if not auth_payload:
return False
user_agent = str(auth_payload.get("user_agent") or "").strip()
if not user_agent:
return False
config = dict(persona.config or {})
if config.get("USER_AGENT") == user_agent:
return False
config["USER_AGENT"] = user_agent
persona.config = config
persona.save(update_fields=["config"])
return True

View File

@@ -117,6 +117,12 @@ class Persona(ModelWithConfig):
cookies_path = self.path / 'cookies.txt'
return str(cookies_path) if cookies_path.exists() else ''
@property
def AUTH_STORAGE_FILE(self) -> str:
"""Derived path to auth.json for this persona (if it exists)."""
auth_path = self.path / 'auth.json'
return str(auth_path) if auth_path.exists() else ''
def get_derived_config(self) -> dict:
"""
Get config dict with derived paths filled in.
@@ -127,6 +133,7 @@ class Persona(ModelWithConfig):
- CHROME_EXTENSIONS_DIR (derived from persona path)
- CHROME_DOWNLOADS_DIR (derived from persona path)
- COOKIES_FILE (derived from persona path, if file exists)
- AUTH_STORAGE_FILE (derived from persona path, if file exists)
- ACTIVE_PERSONA (set to this persona's name)
"""
derived = dict(self.config or {})
@@ -140,6 +147,8 @@ class Persona(ModelWithConfig):
derived['CHROME_DOWNLOADS_DIR'] = self.CHROME_DOWNLOADS_DIR
if 'COOKIES_FILE' not in derived and self.COOKIES_FILE:
derived['COOKIES_FILE'] = self.COOKIES_FILE
if 'AUTH_STORAGE_FILE' not in derived and self.AUTH_STORAGE_FILE:
derived['AUTH_STORAGE_FILE'] = self.AUTH_STORAGE_FILE
# Always set ACTIVE_PERSONA to this persona's name
derived['ACTIVE_PERSONA'] = self.name