mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-04-06 07:47:53 +10:00
856 lines
30 KiB
Python
856 lines
30 KiB
Python
"""
|
|
Shared persona browser discovery/import helpers.
|
|
|
|
These helpers are used by both the CLI and the Django admin so Persona import
|
|
behavior stays consistent regardless of where it is triggered from.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import platform
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
from urllib.parse import urlparse
|
|
|
|
from django.utils.html import format_html
|
|
from django.utils.safestring import SafeString
|
|
|
|
if TYPE_CHECKING:
|
|
from archivebox.personas.models import Persona
|
|
|
|
|
|
BROWSER_LABELS = {
|
|
"chrome": "Google Chrome",
|
|
"chromium": "Chromium",
|
|
"brave": "Brave",
|
|
"edge": "Microsoft Edge",
|
|
"custom": "Custom Path",
|
|
"persona": "Persona Template",
|
|
}
|
|
|
|
BROWSER_PROFILE_DIR_NAMES = (
|
|
"Default",
|
|
"Profile ",
|
|
"Guest Profile",
|
|
)
|
|
|
|
VOLATILE_PROFILE_COPY_PATTERNS = (
|
|
"Cache",
|
|
"Code Cache",
|
|
"GPUCache",
|
|
"ShaderCache",
|
|
"Service Worker",
|
|
"GCM Store",
|
|
"*.log",
|
|
"Crashpad",
|
|
"BrowserMetrics",
|
|
"BrowserMetrics-spare.pma",
|
|
"SingletonLock",
|
|
"SingletonSocket",
|
|
"SingletonCookie",
|
|
)
|
|
|
|
PERSONA_PROFILE_DIR_CANDIDATES = (
|
|
"chrome_profile",
|
|
"chrome_user_data",
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PersonaImportSource:
|
|
kind: str
|
|
browser: str = "custom"
|
|
source_name: str | None = None
|
|
user_data_dir: Path | None = None
|
|
profile_dir: str | None = None
|
|
browser_binary: str | None = None
|
|
cdp_url: str | None = None
|
|
|
|
@property
|
|
def browser_label(self) -> str:
|
|
return BROWSER_LABELS.get(self.browser, self.browser.title())
|
|
|
|
@property
|
|
def profile_path(self) -> Path | None:
|
|
if not self.user_data_dir or not self.profile_dir:
|
|
return None
|
|
return self.user_data_dir / self.profile_dir
|
|
|
|
@property
|
|
def display_label(self) -> str:
|
|
if self.kind == "cdp":
|
|
return self.cdp_url or "CDP URL"
|
|
profile_suffix = f" / {self.profile_dir}" if self.profile_dir else ""
|
|
source_prefix = f": {self.source_name}" if self.source_name else ""
|
|
return f"{self.browser_label}{source_prefix}{profile_suffix}"
|
|
|
|
@property
|
|
def choice_value(self) -> str:
|
|
return json.dumps(
|
|
{
|
|
"kind": self.kind,
|
|
"browser": self.browser,
|
|
"source_name": self.source_name or "",
|
|
"user_data_dir": str(self.user_data_dir) if self.user_data_dir else "",
|
|
"profile_dir": self.profile_dir or "",
|
|
"browser_binary": self.browser_binary or "",
|
|
"cdp_url": self.cdp_url or "",
|
|
},
|
|
sort_keys=True,
|
|
)
|
|
|
|
def as_choice_label(self) -> SafeString:
|
|
path_str = str(self.profile_path or self.user_data_dir or self.cdp_url or "")
|
|
binary_suffix = f"Using {self.browser_binary}" if self.browser_binary else "Will auto-detect a Chromium binary"
|
|
return format_html(
|
|
'<span class="abx-profile-option"><strong>{}</strong><span class="abx-profile-option__meta">{}</span><code>{}</code></span>',
|
|
self.display_label,
|
|
binary_suffix,
|
|
path_str,
|
|
)
|
|
|
|
@classmethod
|
|
def from_choice_value(cls, value: str) -> PersonaImportSource:
|
|
try:
|
|
payload = json.loads(value)
|
|
except json.JSONDecodeError as err:
|
|
raise ValueError("Invalid discovered profile selection.") from err
|
|
|
|
if payload.get("kind") != "browser-profile":
|
|
raise ValueError("Invalid discovered profile selection.")
|
|
|
|
user_data_dir = Path(str(payload.get("user_data_dir") or "")).expanduser()
|
|
profile_dir = str(payload.get("profile_dir") or "").strip()
|
|
browser = str(payload.get("browser") or "custom").strip().lower() or "custom"
|
|
source_name = str(payload.get("source_name") or "").strip() or None
|
|
browser_binary = str(payload.get("browser_binary") or "").strip() or None
|
|
|
|
return resolve_browser_profile_source(
|
|
browser=browser,
|
|
source_name=source_name,
|
|
user_data_dir=user_data_dir,
|
|
profile_dir=profile_dir,
|
|
browser_binary=browser_binary,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class PersonaImportResult:
|
|
source: PersonaImportSource
|
|
profile_copied: bool = False
|
|
cookies_imported: bool = False
|
|
storage_captured: bool = False
|
|
user_agent_imported: bool = False
|
|
warnings: list[str] = field(default_factory=list)
|
|
|
|
@property
|
|
def did_work(self) -> bool:
|
|
return self.profile_copied or self.cookies_imported or self.storage_captured or self.user_agent_imported
|
|
|
|
|
|
def get_chrome_user_data_dir() -> Path | None:
|
|
"""Get the default Chrome user data directory for the current platform."""
|
|
system = platform.system()
|
|
home = Path.home()
|
|
|
|
if system == "Darwin":
|
|
candidates = [
|
|
home / "Library" / "Application Support" / "Google" / "Chrome",
|
|
home / "Library" / "Application Support" / "Chromium",
|
|
]
|
|
elif system == "Linux":
|
|
candidates = [
|
|
home / ".config" / "google-chrome",
|
|
home / ".config" / "chromium",
|
|
home / ".config" / "chrome",
|
|
home / "snap" / "chromium" / "common" / "chromium",
|
|
]
|
|
elif system == "Windows":
|
|
local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local"))
|
|
candidates = [
|
|
local_app_data / "Google" / "Chrome" / "User Data",
|
|
local_app_data / "Chromium" / "User Data",
|
|
]
|
|
else:
|
|
candidates = []
|
|
|
|
for candidate in candidates:
|
|
if candidate.exists() and _list_profile_names(candidate):
|
|
return candidate
|
|
|
|
return None
|
|
|
|
|
|
def get_brave_user_data_dir() -> Path | None:
|
|
"""Get the default Brave user data directory for the current platform."""
|
|
system = platform.system()
|
|
home = Path.home()
|
|
|
|
if system == "Darwin":
|
|
candidates = [
|
|
home / "Library" / "Application Support" / "BraveSoftware" / "Brave-Browser",
|
|
]
|
|
elif system == "Linux":
|
|
candidates = [
|
|
home / ".config" / "BraveSoftware" / "Brave-Browser",
|
|
]
|
|
elif system == "Windows":
|
|
local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local"))
|
|
candidates = [
|
|
local_app_data / "BraveSoftware" / "Brave-Browser" / "User Data",
|
|
]
|
|
else:
|
|
candidates = []
|
|
|
|
for candidate in candidates:
|
|
if candidate.exists() and _list_profile_names(candidate):
|
|
return candidate
|
|
|
|
return None
|
|
|
|
|
|
def get_edge_user_data_dir() -> Path | None:
|
|
"""Get the default Edge user data directory for the current platform."""
|
|
system = platform.system()
|
|
home = Path.home()
|
|
|
|
if system == "Darwin":
|
|
candidates = [
|
|
home / "Library" / "Application Support" / "Microsoft Edge",
|
|
]
|
|
elif system == "Linux":
|
|
candidates = [
|
|
home / ".config" / "microsoft-edge",
|
|
home / ".config" / "microsoft-edge-beta",
|
|
home / ".config" / "microsoft-edge-dev",
|
|
]
|
|
elif system == "Windows":
|
|
local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local"))
|
|
candidates = [
|
|
local_app_data / "Microsoft" / "Edge" / "User Data",
|
|
]
|
|
else:
|
|
candidates = []
|
|
|
|
for candidate in candidates:
|
|
if candidate.exists() and _list_profile_names(candidate):
|
|
return candidate
|
|
|
|
return None
|
|
|
|
|
|
def get_browser_binary(browser: str) -> str | None:
|
|
system = platform.system()
|
|
home = Path.home()
|
|
browser = browser.lower()
|
|
|
|
if system == "Darwin":
|
|
candidates = {
|
|
"chrome": ["/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"],
|
|
"chromium": ["/Applications/Chromium.app/Contents/MacOS/Chromium"],
|
|
"brave": ["/Applications/Brave Browser.app/Contents/MacOS/Brave Browser"],
|
|
"edge": ["/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge"],
|
|
}.get(browser, [])
|
|
elif system == "Linux":
|
|
candidates = {
|
|
"chrome": [
|
|
"/usr/bin/google-chrome",
|
|
"/usr/bin/google-chrome-stable",
|
|
"/usr/bin/google-chrome-beta",
|
|
"/usr/bin/google-chrome-unstable",
|
|
],
|
|
"chromium": ["/usr/bin/chromium", "/usr/bin/chromium-browser"],
|
|
"brave": ["/usr/bin/brave-browser", "/usr/bin/brave-browser-beta", "/usr/bin/brave-browser-nightly"],
|
|
"edge": [
|
|
"/usr/bin/microsoft-edge",
|
|
"/usr/bin/microsoft-edge-stable",
|
|
"/usr/bin/microsoft-edge-beta",
|
|
"/usr/bin/microsoft-edge-dev",
|
|
],
|
|
}.get(browser, [])
|
|
elif system == "Windows":
|
|
local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local"))
|
|
candidates = {
|
|
"chrome": [
|
|
str(local_app_data / "Google" / "Chrome" / "Application" / "chrome.exe"),
|
|
"C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe",
|
|
"C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe",
|
|
],
|
|
"chromium": [str(local_app_data / "Chromium" / "Application" / "chrome.exe")],
|
|
"brave": [
|
|
str(local_app_data / "BraveSoftware" / "Brave-Browser" / "Application" / "brave.exe"),
|
|
"C:\\Program Files\\BraveSoftware\\Brave-Browser\\Application\\brave.exe",
|
|
"C:\\Program Files (x86)\\BraveSoftware\\Brave-Browser\\Application\\brave.exe",
|
|
],
|
|
"edge": [
|
|
str(local_app_data / "Microsoft" / "Edge" / "Application" / "msedge.exe"),
|
|
"C:\\Program Files\\Microsoft\\Edge\\Application\\msedge.exe",
|
|
"C:\\Program Files (x86)\\Microsoft\\Edge\\Application\\msedge.exe",
|
|
],
|
|
}.get(browser, [])
|
|
else:
|
|
candidates = []
|
|
|
|
for candidate in candidates:
|
|
if candidate and Path(candidate).exists():
|
|
return candidate
|
|
|
|
return None
|
|
|
|
|
|
BROWSER_PROFILE_FINDERS = {
|
|
"chrome": get_chrome_user_data_dir,
|
|
"chromium": get_chrome_user_data_dir,
|
|
"brave": get_brave_user_data_dir,
|
|
"edge": get_edge_user_data_dir,
|
|
}
|
|
|
|
CHROMIUM_BROWSERS = tuple(BROWSER_PROFILE_FINDERS.keys())
|
|
|
|
|
|
NETSCAPE_COOKIE_HEADER = [
|
|
"# Netscape HTTP Cookie File",
|
|
"# https://curl.se/docs/http-cookies.html",
|
|
"# This file was generated by ArchiveBox persona cookie extraction",
|
|
"#",
|
|
"# Format: domain\\tincludeSubdomains\\tpath\\tsecure\\texpiry\\tname\\tvalue",
|
|
"",
|
|
]
|
|
|
|
|
|
def validate_persona_name(name: str) -> tuple[bool, str]:
|
|
"""Validate persona name to prevent path traversal."""
|
|
if not name or not name.strip():
|
|
return False, "Persona name cannot be empty"
|
|
if "/" in name or "\\" in name:
|
|
return False, "Persona name cannot contain path separators (/ or \\)"
|
|
if ".." in name:
|
|
return False, "Persona name cannot contain parent directory references (..)"
|
|
if name.startswith("."):
|
|
return False, "Persona name cannot start with a dot (.)"
|
|
if "\x00" in name or "\n" in name or "\r" in name:
|
|
return False, "Persona name contains invalid characters"
|
|
return True, ""
|
|
|
|
|
|
def discover_local_browser_profiles() -> list[PersonaImportSource]:
|
|
discovered: list[PersonaImportSource] = []
|
|
|
|
for browser, finder in BROWSER_PROFILE_FINDERS.items():
|
|
user_data_dir = finder()
|
|
if not user_data_dir:
|
|
continue
|
|
|
|
browser_binary = get_browser_binary(browser)
|
|
for profile_dir in _list_profile_names(user_data_dir):
|
|
try:
|
|
discovered.append(
|
|
resolve_browser_profile_source(
|
|
browser=browser,
|
|
user_data_dir=user_data_dir,
|
|
profile_dir=profile_dir,
|
|
browser_binary=browser_binary,
|
|
),
|
|
)
|
|
except ValueError:
|
|
continue
|
|
|
|
discovered.extend(discover_persona_template_profiles())
|
|
|
|
return discovered
|
|
|
|
|
|
def discover_persona_template_profiles(personas_dir: Path | None = None) -> list[PersonaImportSource]:
|
|
from archivebox.config.constants import CONSTANTS
|
|
|
|
templates: list[PersonaImportSource] = []
|
|
candidate_roots: list[Path] = []
|
|
|
|
if personas_dir is not None:
|
|
candidate_roots.append(personas_dir.expanduser())
|
|
else:
|
|
candidate_roots.extend(
|
|
[
|
|
CONSTANTS.PERSONAS_DIR.expanduser(),
|
|
Path.home() / ".config" / "abx" / "personas",
|
|
],
|
|
)
|
|
|
|
seen_roots: set[Path] = set()
|
|
for personas_root in candidate_roots:
|
|
resolved_root = personas_root.resolve()
|
|
if resolved_root in seen_roots:
|
|
continue
|
|
seen_roots.add(resolved_root)
|
|
|
|
if not resolved_root.exists() or not resolved_root.is_dir():
|
|
continue
|
|
|
|
for persona_dir in sorted((path for path in resolved_root.iterdir() if path.is_dir()), key=lambda path: path.name.lower()):
|
|
for candidate_dir_name in PERSONA_PROFILE_DIR_CANDIDATES:
|
|
user_data_dir = persona_dir / candidate_dir_name
|
|
if not user_data_dir.exists() or not user_data_dir.is_dir():
|
|
continue
|
|
|
|
for profile_dir in _list_profile_names(user_data_dir):
|
|
try:
|
|
templates.append(
|
|
resolve_browser_profile_source(
|
|
browser="persona",
|
|
source_name=persona_dir.name,
|
|
user_data_dir=user_data_dir,
|
|
profile_dir=profile_dir,
|
|
browser_binary=get_browser_binary("chrome"),
|
|
),
|
|
)
|
|
except ValueError:
|
|
continue
|
|
|
|
return templates
|
|
|
|
|
|
def resolve_browser_import_source(browser: str, profile_dir: str | None = None) -> PersonaImportSource:
|
|
browser = browser.lower().strip()
|
|
if browser not in BROWSER_PROFILE_FINDERS:
|
|
supported = ", ".join(BROWSER_PROFILE_FINDERS)
|
|
raise ValueError(f"Unknown browser: {browser}. Supported browsers: {supported}")
|
|
|
|
user_data_dir = BROWSER_PROFILE_FINDERS[browser]()
|
|
if not user_data_dir:
|
|
raise ValueError(f"Could not find {browser} profile directory")
|
|
|
|
chosen_profile = profile_dir or pick_default_profile_dir(user_data_dir)
|
|
if not chosen_profile:
|
|
raise ValueError(f"Could not find a profile in {user_data_dir}")
|
|
|
|
return resolve_browser_profile_source(
|
|
browser=browser,
|
|
user_data_dir=user_data_dir,
|
|
profile_dir=chosen_profile,
|
|
browser_binary=get_browser_binary(browser),
|
|
)
|
|
|
|
|
|
def resolve_browser_profile_source(
|
|
browser: str,
|
|
user_data_dir: Path,
|
|
profile_dir: str,
|
|
source_name: str | None = None,
|
|
browser_binary: str | None = None,
|
|
) -> PersonaImportSource:
|
|
resolved_root = user_data_dir.expanduser()
|
|
if not resolved_root.is_absolute():
|
|
resolved_root = resolved_root.resolve()
|
|
if not resolved_root.exists():
|
|
raise ValueError(f"Profile root does not exist: {resolved_root}")
|
|
if not profile_dir.strip():
|
|
raise ValueError("Profile directory name cannot be empty.")
|
|
|
|
profile_path = resolved_root / profile_dir
|
|
if not _looks_like_profile_dir(profile_path):
|
|
raise ValueError(f"Profile directory does not look valid: {profile_path}")
|
|
|
|
return PersonaImportSource(
|
|
kind="browser-profile",
|
|
browser=browser,
|
|
source_name=source_name,
|
|
user_data_dir=resolved_root,
|
|
profile_dir=profile_dir,
|
|
browser_binary=browser_binary,
|
|
)
|
|
|
|
|
|
def resolve_custom_import_source(raw_value: str, profile_dir: str | None = None) -> PersonaImportSource:
|
|
raw_value = raw_value.strip()
|
|
if not raw_value:
|
|
raise ValueError("Provide an absolute browser profile path or a CDP URL.")
|
|
|
|
if _looks_like_cdp_url(raw_value):
|
|
return PersonaImportSource(kind="cdp", cdp_url=raw_value)
|
|
|
|
source_path = Path(raw_value).expanduser()
|
|
if not source_path.is_absolute():
|
|
raise ValueError("Custom browser path must be an absolute path.")
|
|
if not source_path.exists():
|
|
raise ValueError(f"Custom browser path does not exist: {source_path}")
|
|
|
|
explicit_profile = profile_dir.strip() if profile_dir else ""
|
|
if _looks_like_profile_dir(source_path):
|
|
if explicit_profile and explicit_profile != source_path.name:
|
|
raise ValueError("Profile name does not match the provided profile directory path.")
|
|
return resolve_browser_profile_source(
|
|
browser="custom",
|
|
user_data_dir=source_path.parent.resolve(),
|
|
profile_dir=source_path.name,
|
|
)
|
|
|
|
chosen_profile = explicit_profile or pick_default_profile_dir(source_path)
|
|
if not chosen_profile:
|
|
raise ValueError(
|
|
"Could not find a Chromium profile in that directory. "
|
|
"Provide an exact profile directory path or fill in the profile name field.",
|
|
)
|
|
|
|
return resolve_browser_profile_source(
|
|
browser="custom",
|
|
user_data_dir=source_path.resolve(),
|
|
profile_dir=chosen_profile,
|
|
)
|
|
|
|
|
|
def pick_default_profile_dir(user_data_dir: Path) -> str | None:
|
|
profiles = _list_profile_names(user_data_dir)
|
|
if not profiles:
|
|
return None
|
|
if "Default" in profiles:
|
|
return "Default"
|
|
return profiles[0]
|
|
|
|
|
|
def import_persona_from_source(
|
|
persona: Persona,
|
|
source: PersonaImportSource,
|
|
*,
|
|
copy_profile: bool = True,
|
|
import_cookies: bool = True,
|
|
capture_storage: bool = False,
|
|
) -> PersonaImportResult:
|
|
persona.ensure_dirs()
|
|
result = PersonaImportResult(source=source)
|
|
|
|
persona_chrome_dir = Path(persona.CHROME_USER_DATA_DIR)
|
|
cookies_file = persona.path / "cookies.txt"
|
|
auth_file = persona.path / "auth.json"
|
|
|
|
launch_user_data_dir: Path | None = None
|
|
|
|
if source.kind == "browser-profile":
|
|
if copy_profile and source.user_data_dir:
|
|
resolved_source_root = source.user_data_dir.resolve()
|
|
resolved_persona_root = persona_chrome_dir.resolve()
|
|
if resolved_source_root == resolved_persona_root:
|
|
result.warnings.append(
|
|
"Skipped profile copy because the selected source is already this persona's chrome_user_data directory.",
|
|
)
|
|
else:
|
|
copy_browser_user_data_dir(resolved_source_root, resolved_persona_root)
|
|
persona.cleanup_chrome_profile(resolved_persona_root)
|
|
result.profile_copied = True
|
|
launch_user_data_dir = resolved_persona_root
|
|
else:
|
|
launch_user_data_dir = source.user_data_dir
|
|
elif copy_profile:
|
|
result.warnings.append(
|
|
"Profile copying is only available for local Chromium profile paths. CDP imports can only pull cookies and open-tab storage.",
|
|
)
|
|
|
|
if source.kind == "cdp":
|
|
export_success, auth_payload, export_message = export_browser_state(
|
|
cdp_url=source.cdp_url,
|
|
cookies_output_file=cookies_file if import_cookies else None,
|
|
auth_output_file=auth_file if capture_storage else None,
|
|
)
|
|
else:
|
|
export_success, auth_payload, export_message = export_browser_state(
|
|
user_data_dir=launch_user_data_dir,
|
|
profile_dir=source.profile_dir,
|
|
chrome_binary=source.browser_binary,
|
|
cookies_output_file=cookies_file if import_cookies else None,
|
|
auth_output_file=auth_file if capture_storage else None,
|
|
)
|
|
|
|
if not export_success:
|
|
result.warnings.append(export_message or "Browser import failed.")
|
|
return result
|
|
|
|
if import_cookies and cookies_file.exists():
|
|
result.cookies_imported = True
|
|
if capture_storage and auth_file.exists():
|
|
result.storage_captured = True
|
|
if _apply_imported_user_agent(persona, auth_payload):
|
|
result.user_agent_imported = True
|
|
|
|
return result
|
|
|
|
|
|
def copy_browser_user_data_dir(source_dir: Path, destination_dir: Path) -> None:
|
|
destination_dir.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.rmtree(destination_dir, ignore_errors=True)
|
|
shutil.copytree(
|
|
source_dir,
|
|
destination_dir,
|
|
symlinks=True,
|
|
ignore=shutil.ignore_patterns(*VOLATILE_PROFILE_COPY_PATTERNS),
|
|
)
|
|
|
|
|
|
def export_browser_state(
|
|
*,
|
|
user_data_dir: Path | None = None,
|
|
cdp_url: str | None = None,
|
|
profile_dir: str | None = None,
|
|
chrome_binary: str | None = None,
|
|
cookies_output_file: Path | None = None,
|
|
auth_output_file: Path | None = None,
|
|
) -> tuple[bool, dict | None, str]:
|
|
if not user_data_dir and not cdp_url:
|
|
return False, None, "Missing browser source."
|
|
|
|
from abx_plugins import get_plugins_dir
|
|
from archivebox.config.common import STORAGE_CONFIG
|
|
|
|
state_script = Path(__file__).with_name("export_browser_state.js")
|
|
if not state_script.exists():
|
|
return False, None, f"Browser state export script not found at {state_script}"
|
|
|
|
node_modules_dir = STORAGE_CONFIG.LIB_DIR / "npm" / "node_modules"
|
|
chrome_plugin_dir = Path(get_plugins_dir()).resolve()
|
|
|
|
env = os.environ.copy()
|
|
env["NODE_MODULES_DIR"] = str(node_modules_dir)
|
|
env["ARCHIVEBOX_ABX_PLUGINS_DIR"] = str(chrome_plugin_dir)
|
|
|
|
if user_data_dir:
|
|
env["CHROME_USER_DATA_DIR"] = str(user_data_dir)
|
|
if cdp_url:
|
|
env["CHROME_CDP_URL"] = cdp_url
|
|
env["CHROME_IS_LOCAL"] = "false"
|
|
if chrome_binary:
|
|
env["CHROME_BINARY"] = str(chrome_binary)
|
|
if profile_dir:
|
|
extra_arg = f"--profile-directory={profile_dir}"
|
|
existing_extra = env.get("CHROME_ARGS_EXTRA", "").strip()
|
|
args_list: list[str] = []
|
|
if existing_extra:
|
|
if existing_extra.startswith("["):
|
|
try:
|
|
parsed = json.loads(existing_extra)
|
|
if isinstance(parsed, list):
|
|
args_list.extend(str(x) for x in parsed)
|
|
except Exception:
|
|
args_list.extend([s.strip() for s in existing_extra.split(",") if s.strip()])
|
|
else:
|
|
args_list.extend([s.strip() for s in existing_extra.split(",") if s.strip()])
|
|
args_list.append(extra_arg)
|
|
env["CHROME_ARGS_EXTRA"] = json.dumps(args_list)
|
|
|
|
temp_dir: Path | None = None
|
|
tmp_cookies_file: Path | None = None
|
|
tmp_auth_file: Path | None = None
|
|
|
|
if cookies_output_file and cookies_output_file.exists():
|
|
temp_dir = Path(tempfile.mkdtemp(prefix="ab_browser_state_"))
|
|
tmp_cookies_file = temp_dir / "cookies.txt"
|
|
env["COOKIES_OUTPUT_FILE"] = str(tmp_cookies_file)
|
|
elif cookies_output_file:
|
|
env["COOKIES_OUTPUT_FILE"] = str(cookies_output_file)
|
|
|
|
if auth_output_file and auth_output_file.exists():
|
|
temp_dir = temp_dir or Path(tempfile.mkdtemp(prefix="ab_browser_state_"))
|
|
tmp_auth_file = temp_dir / "auth.json"
|
|
env["AUTH_STORAGE_OUTPUT_FILE"] = str(tmp_auth_file)
|
|
elif auth_output_file:
|
|
env["AUTH_STORAGE_OUTPUT_FILE"] = str(auth_output_file)
|
|
else:
|
|
temp_dir = temp_dir or Path(tempfile.mkdtemp(prefix="ab_browser_state_"))
|
|
tmp_auth_file = temp_dir / "auth.json"
|
|
env["AUTH_STORAGE_OUTPUT_FILE"] = str(tmp_auth_file)
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
["node", str(state_script)],
|
|
env=env,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=120,
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return False, None, "Browser state export timed out."
|
|
except FileNotFoundError:
|
|
return False, None, "Node.js was not found, so ArchiveBox could not extract browser state."
|
|
except Exception as err:
|
|
return False, None, f"Browser state export failed: {err}"
|
|
|
|
if result.returncode != 0:
|
|
message = (result.stderr or result.stdout or "").strip() or "Browser state export failed."
|
|
return False, None, message
|
|
|
|
auth_payload: dict | None = None
|
|
if cookies_output_file and tmp_cookies_file and tmp_cookies_file.exists():
|
|
_merge_netscape_cookies(cookies_output_file, tmp_cookies_file)
|
|
if auth_output_file and tmp_auth_file and tmp_auth_file.exists():
|
|
_merge_auth_storage(auth_output_file, tmp_auth_file)
|
|
auth_payload = _load_auth_storage(tmp_auth_file)
|
|
elif auth_output_file and auth_output_file.exists():
|
|
auth_payload = _load_auth_storage(auth_output_file)
|
|
elif tmp_auth_file and tmp_auth_file.exists():
|
|
auth_payload = _load_auth_storage(tmp_auth_file)
|
|
|
|
if temp_dir and temp_dir.exists():
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
return True, auth_payload, (result.stderr or result.stdout or "").strip()
|
|
|
|
|
|
def _list_profile_names(user_data_dir: Path) -> list[str]:
|
|
if not user_data_dir.exists() or not user_data_dir.is_dir():
|
|
return []
|
|
|
|
profiles: list[str] = []
|
|
for child in sorted(user_data_dir.iterdir(), key=lambda path: path.name.lower()):
|
|
if not child.is_dir():
|
|
continue
|
|
if child.name == "System Profile":
|
|
continue
|
|
if child.name == "Default" or child.name.startswith("Profile ") or child.name.startswith("Guest Profile"):
|
|
if _looks_like_profile_dir(child):
|
|
profiles.append(child.name)
|
|
continue
|
|
if _looks_like_profile_dir(child):
|
|
profiles.append(child.name)
|
|
return profiles
|
|
|
|
|
|
def _looks_like_profile_dir(path: Path) -> bool:
|
|
if not path.exists() or not path.is_dir():
|
|
return False
|
|
|
|
marker_paths = (
|
|
path / "Preferences",
|
|
path / "History",
|
|
path / "Cookies",
|
|
path / "Network" / "Cookies",
|
|
path / "Local Storage",
|
|
path / "Session Storage",
|
|
)
|
|
|
|
if any(marker.exists() for marker in marker_paths):
|
|
return True
|
|
|
|
return any(path.name == prefix or path.name.startswith(prefix) for prefix in BROWSER_PROFILE_DIR_NAMES)
|
|
|
|
|
|
def _looks_like_cdp_url(value: str) -> bool:
|
|
parsed = urlparse(value)
|
|
return parsed.scheme in {"ws", "wss", "http", "https"} and bool(parsed.netloc)
|
|
|
|
|
|
def _parse_netscape_cookies(path: Path) -> dict[tuple[str, str, str], tuple[str, str, str, str, str, str, str]]:
|
|
cookies: dict[tuple[str, str, str], tuple[str, str, str, str, str, str, str]] = {}
|
|
if not path.exists():
|
|
return cookies
|
|
|
|
for line in path.read_text().splitlines():
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
parts = line.split("\t")
|
|
if len(parts) < 7:
|
|
continue
|
|
domain, include_subdomains, cookie_path, secure, expiry, name, value = parts[:7]
|
|
cookies[(domain, cookie_path, name)] = (domain, include_subdomains, cookie_path, secure, expiry, name, value)
|
|
return cookies
|
|
|
|
|
|
def _write_netscape_cookies(
|
|
path: Path,
|
|
cookies: dict[tuple[str, str, str], tuple[str, str, str, str, str, str, str]],
|
|
) -> None:
|
|
lines = list(NETSCAPE_COOKIE_HEADER)
|
|
for cookie in cookies.values():
|
|
lines.append("\t".join(cookie))
|
|
path.write_text("\n".join(lines) + "\n")
|
|
|
|
|
|
def _merge_netscape_cookies(existing_file: Path, new_file: Path) -> None:
|
|
existing = _parse_netscape_cookies(existing_file)
|
|
new = _parse_netscape_cookies(new_file)
|
|
existing.update(new)
|
|
_write_netscape_cookies(existing_file, existing)
|
|
|
|
|
|
def _merge_auth_storage(existing_file: Path, new_file: Path) -> None:
|
|
existing_payload = _load_auth_storage(existing_file)
|
|
new_payload = _load_auth_storage(new_file)
|
|
|
|
existing_local = existing_payload.setdefault("localStorage", {})
|
|
existing_session = existing_payload.setdefault("sessionStorage", {})
|
|
|
|
for origin, payload in (new_payload.get("localStorage") or {}).items():
|
|
existing_local[origin] = payload
|
|
for origin, payload in (new_payload.get("sessionStorage") or {}).items():
|
|
existing_session[origin] = payload
|
|
|
|
cookies = _merge_cookie_dicts(existing_payload.get("cookies") or [], new_payload.get("cookies") or [])
|
|
|
|
merged = {
|
|
**existing_payload,
|
|
**new_payload,
|
|
"cookies": cookies,
|
|
"localStorage": existing_local,
|
|
"sessionStorage": existing_session,
|
|
"user_agent": new_payload.get("user_agent") or existing_payload.get("user_agent") or "",
|
|
}
|
|
existing_file.write_text(json.dumps(merged, indent=2, sort_keys=True) + "\n")
|
|
|
|
|
|
def _load_auth_storage(path: Path) -> dict:
|
|
if not path.exists():
|
|
return {
|
|
"TYPE": "auth",
|
|
"cookies": [],
|
|
"localStorage": {},
|
|
"sessionStorage": {},
|
|
}
|
|
try:
|
|
payload = json.loads(path.read_text())
|
|
except json.JSONDecodeError:
|
|
return {
|
|
"TYPE": "auth",
|
|
"cookies": [],
|
|
"localStorage": {},
|
|
"sessionStorage": {},
|
|
}
|
|
if not isinstance(payload, dict):
|
|
return {
|
|
"TYPE": "auth",
|
|
"cookies": [],
|
|
"localStorage": {},
|
|
"sessionStorage": {},
|
|
}
|
|
return payload
|
|
|
|
|
|
def _merge_cookie_dicts(existing: list[dict], new: list[dict]) -> list[dict]:
|
|
merged: dict[tuple[str, str, str], dict] = {}
|
|
for cookie in existing:
|
|
key = (str(cookie.get("domain") or ""), str(cookie.get("path") or "/"), str(cookie.get("name") or ""))
|
|
merged[key] = cookie
|
|
for cookie in new:
|
|
key = (str(cookie.get("domain") or ""), str(cookie.get("path") or "/"), str(cookie.get("name") or ""))
|
|
merged[key] = cookie
|
|
return list(merged.values())
|
|
|
|
|
|
def _apply_imported_user_agent(persona: Persona, auth_payload: dict | None) -> bool:
|
|
if not auth_payload:
|
|
return False
|
|
|
|
user_agent = str(auth_payload.get("user_agent") or "").strip()
|
|
if not user_agent:
|
|
return False
|
|
|
|
config = dict(persona.config or {})
|
|
if config.get("USER_AGENT") == user_agent:
|
|
return False
|
|
|
|
config["USER_AGENT"] = user_agent
|
|
persona.config = config
|
|
persona.save(update_fields=["config"])
|
|
return True
|