Files
ArchiveBox/archivebox/cli/archivebox_persona.py
Nick Sweeting b749b26c5d wip
2026-03-23 03:58:32 -07:00

777 lines
26 KiB
Python

#!/usr/bin/env python3
"""
archivebox persona <action> [args...] [--filters]
Manage Persona records (browser profiles for archiving).
Actions:
create - Create Personas
list - List Personas as JSONL (with optional filters)
update - Update Personas from stdin JSONL
delete - Delete Personas from stdin JSONL
Examples:
# Create a new persona
archivebox persona create work
archivebox persona create --import=chrome personal
archivebox persona create --import=edge work
# List all personas
archivebox persona list
# Delete a persona
archivebox persona list --name=old | archivebox persona delete --yes
"""
__package__ = "archivebox.cli"
__command__ = "archivebox persona"
import os
import sys
import shutil
import platform
import subprocess
import tempfile
import json
from pathlib import Path
from collections.abc import Iterable
from collections import OrderedDict
import rich_click as click
from rich import print as rprint
from archivebox.cli.cli_utils import apply_filters
from archivebox.personas import importers as persona_importers
# =============================================================================
# Browser Profile Locations
# =============================================================================
def get_chrome_user_data_dir() -> Path | None:
"""Get the default Chrome user data directory for the current platform."""
system = platform.system()
home = Path.home()
if system == "Darwin": # macOS
candidates = [
home / "Library" / "Application Support" / "Google" / "Chrome",
home / "Library" / "Application Support" / "Chromium",
]
elif system == "Linux":
candidates = [
home / ".config" / "google-chrome",
home / ".config" / "chromium",
home / ".config" / "chrome",
home / "snap" / "chromium" / "common" / "chromium",
]
elif system == "Windows":
local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local"))
candidates = [
local_app_data / "Google" / "Chrome" / "User Data",
local_app_data / "Chromium" / "User Data",
]
else:
candidates = []
for candidate in candidates:
if candidate.exists() and (candidate / "Default").exists():
return candidate
return None
def get_brave_user_data_dir() -> Path | None:
"""Get the default Brave user data directory for the current platform."""
system = platform.system()
home = Path.home()
if system == "Darwin":
candidates = [
home / "Library" / "Application Support" / "BraveSoftware" / "Brave-Browser",
]
elif system == "Linux":
candidates = [
home / ".config" / "BraveSoftware" / "Brave-Browser",
]
elif system == "Windows":
local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local"))
candidates = [
local_app_data / "BraveSoftware" / "Brave-Browser" / "User Data",
]
else:
candidates = []
for candidate in candidates:
if candidate.exists() and (candidate / "Default").exists():
return candidate
return None
def get_edge_user_data_dir() -> Path | None:
"""Get the default Edge user data directory for the current platform."""
system = platform.system()
home = Path.home()
if system == "Darwin":
candidates = [
home / "Library" / "Application Support" / "Microsoft Edge",
]
elif system == "Linux":
candidates = [
home / ".config" / "microsoft-edge",
home / ".config" / "microsoft-edge-beta",
home / ".config" / "microsoft-edge-dev",
]
elif system == "Windows":
local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local"))
candidates = [
local_app_data / "Microsoft" / "Edge" / "User Data",
]
else:
candidates = []
for candidate in candidates:
if candidate.exists() and (candidate / "Default").exists():
return candidate
return None
def get_browser_binary(browser: str) -> str | None:
system = platform.system()
home = Path.home()
browser = browser.lower()
if system == "Darwin":
candidates = {
"chrome": ["/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"],
"chromium": ["/Applications/Chromium.app/Contents/MacOS/Chromium"],
"brave": ["/Applications/Brave Browser.app/Contents/MacOS/Brave Browser"],
"edge": ["/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge"],
}.get(browser, [])
elif system == "Linux":
candidates = {
"chrome": [
"/usr/bin/google-chrome",
"/usr/bin/google-chrome-stable",
"/usr/bin/google-chrome-beta",
"/usr/bin/google-chrome-unstable",
],
"chromium": ["/usr/bin/chromium", "/usr/bin/chromium-browser"],
"brave": ["/usr/bin/brave-browser", "/usr/bin/brave-browser-beta", "/usr/bin/brave-browser-nightly"],
"edge": [
"/usr/bin/microsoft-edge",
"/usr/bin/microsoft-edge-stable",
"/usr/bin/microsoft-edge-beta",
"/usr/bin/microsoft-edge-dev",
],
}.get(browser, [])
elif system == "Windows":
local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local"))
candidates = {
"chrome": [
str(local_app_data / "Google" / "Chrome" / "Application" / "chrome.exe"),
"C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe",
"C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe",
],
"chromium": [str(local_app_data / "Chromium" / "Application" / "chrome.exe")],
"brave": [
str(local_app_data / "BraveSoftware" / "Brave-Browser" / "Application" / "brave.exe"),
"C:\\Program Files\\BraveSoftware\\Brave-Browser\\Application\\brave.exe",
"C:\\Program Files (x86)\\BraveSoftware\\Brave-Browser\\Application\\brave.exe",
],
"edge": [
str(local_app_data / "Microsoft" / "Edge" / "Application" / "msedge.exe"),
"C:\\Program Files\\Microsoft\\Edge\\Application\\msedge.exe",
"C:\\Program Files (x86)\\Microsoft\\Edge\\Application\\msedge.exe",
],
}.get(browser, [])
else:
candidates = []
for candidate in candidates:
if candidate and Path(candidate).exists():
return candidate
return None
BROWSER_PROFILE_FINDERS = {
"chrome": get_chrome_user_data_dir,
"chromium": get_chrome_user_data_dir, # Same locations
"brave": get_brave_user_data_dir,
"edge": get_edge_user_data_dir,
}
CHROMIUM_BROWSERS = {"chrome", "chromium", "brave", "edge"}
# =============================================================================
# Cookie Extraction via CDP
# =============================================================================
NETSCAPE_COOKIE_HEADER = [
"# Netscape HTTP Cookie File",
"# https://curl.se/docs/http-cookies.html",
"# This file was generated by ArchiveBox persona cookie extraction",
"#",
"# Format: domain\\tincludeSubdomains\\tpath\\tsecure\\texpiry\\tname\\tvalue",
"",
]
def _parse_netscape_cookies(path: Path) -> "OrderedDict[tuple[str, str, str], tuple[str, str, str, str, str, str, str]]":
cookies = OrderedDict()
if not path.exists():
return cookies
for line in path.read_text().splitlines():
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) < 7:
continue
domain, include_subdomains, cookie_path, secure, expiry, name, value = parts[:7]
key = (domain, cookie_path, name)
cookies[key] = (domain, include_subdomains, cookie_path, secure, expiry, name, value)
return cookies
def _write_netscape_cookies(path: Path, cookies: "OrderedDict[tuple[str, str, str], tuple[str, str, str, str, str, str, str]]") -> None:
lines = list(NETSCAPE_COOKIE_HEADER)
for cookie in cookies.values():
lines.append("\t".join(cookie))
path.write_text("\n".join(lines) + "\n")
def _merge_netscape_cookies(existing_file: Path, new_file: Path) -> None:
existing = _parse_netscape_cookies(existing_file)
new = _parse_netscape_cookies(new_file)
for key, cookie in new.items():
existing[key] = cookie
_write_netscape_cookies(existing_file, existing)
def extract_cookies_via_cdp(
user_data_dir: Path,
output_file: Path,
profile_dir: str | None = None,
chrome_binary: str | None = None,
) -> bool:
"""
Launch Chrome with the given user data dir and extract cookies via CDP.
Returns True if successful, False otherwise.
"""
from archivebox.config.common import STORAGE_CONFIG
# Find the cookie extraction script
chrome_plugin_dir = Path(__file__).parent.parent / "plugins" / "chrome"
extract_script = chrome_plugin_dir / "extract_cookies.js"
if not extract_script.exists():
rprint(f"[yellow]Cookie extraction script not found at {extract_script}[/yellow]", file=sys.stderr)
return False
# Get node modules dir
node_modules_dir = STORAGE_CONFIG.LIB_DIR / "npm" / "node_modules"
# Set up environment
env = os.environ.copy()
env["NODE_MODULES_DIR"] = str(node_modules_dir)
env["CHROME_USER_DATA_DIR"] = str(user_data_dir)
env["CHROME_HEADLESS"] = "true"
if chrome_binary:
env["CHROME_BINARY"] = str(chrome_binary)
output_path = output_file
temp_output = None
temp_dir = None
if output_file.exists():
temp_dir = Path(tempfile.mkdtemp(prefix="ab_cookies_"))
temp_output = temp_dir / "cookies.txt"
output_path = temp_output
if profile_dir:
extra_arg = f"--profile-directory={profile_dir}"
existing_extra = env.get("CHROME_ARGS_EXTRA", "").strip()
args_list = []
if existing_extra:
if existing_extra.startswith("["):
try:
parsed = json.loads(existing_extra)
if isinstance(parsed, list):
args_list.extend(str(x) for x in parsed)
except Exception:
args_list.extend([s.strip() for s in existing_extra.split(",") if s.strip()])
else:
args_list.extend([s.strip() for s in existing_extra.split(",") if s.strip()])
args_list.append(extra_arg)
env["CHROME_ARGS_EXTRA"] = json.dumps(args_list)
env["COOKIES_OUTPUT_FILE"] = str(output_path)
try:
result = subprocess.run(
["node", str(extract_script)],
env=env,
capture_output=True,
text=True,
timeout=60,
)
if result.returncode == 0:
if temp_output and temp_output.exists():
_merge_netscape_cookies(output_file, temp_output)
return True
else:
rprint(f"[yellow]Cookie extraction failed: {result.stderr}[/yellow]", file=sys.stderr)
return False
except subprocess.TimeoutExpired:
rprint("[yellow]Cookie extraction timed out[/yellow]", file=sys.stderr)
return False
except FileNotFoundError:
rprint("[yellow]Node.js not found. Cannot extract cookies.[/yellow]", file=sys.stderr)
return False
except Exception as e:
rprint(f"[yellow]Cookie extraction error: {e}[/yellow]", file=sys.stderr)
return False
finally:
if temp_dir and temp_dir.exists():
shutil.rmtree(temp_dir, ignore_errors=True)
# =============================================================================
# Validation Helpers
# =============================================================================
def validate_persona_name(name: str) -> tuple[bool, str]:
"""
Validate persona name to prevent path traversal attacks.
Returns:
(is_valid, error_message): tuple indicating if name is valid
"""
if not name or not name.strip():
return False, "Persona name cannot be empty"
# Check for path separators
if "/" in name or "\\" in name:
return False, "Persona name cannot contain path separators (/ or \\)"
# Check for parent directory references
if ".." in name:
return False, "Persona name cannot contain parent directory references (..)"
# Check for hidden files/directories
if name.startswith("."):
return False, "Persona name cannot start with a dot (.)"
# Ensure name doesn't contain null bytes or other dangerous chars
if "\x00" in name or "\n" in name or "\r" in name:
return False, "Persona name contains invalid characters"
return True, ""
def ensure_path_within_personas_dir(persona_path: Path) -> bool:
"""
Verify that a persona path is within PERSONAS_DIR.
This is a safety check to prevent path traversal attacks where
a malicious persona name could cause operations on paths outside
the expected PERSONAS_DIR.
Returns:
True if path is safe, False otherwise
"""
from archivebox.config.constants import CONSTANTS
try:
# Resolve both paths to absolute paths
personas_dir = CONSTANTS.PERSONAS_DIR.resolve()
resolved_path = persona_path.resolve()
# Check if resolved_path is a child of personas_dir
return resolved_path.is_relative_to(personas_dir)
except (ValueError, RuntimeError):
return False
# =============================================================================
# CREATE
# =============================================================================
def create_personas(
names: Iterable[str],
import_from: str | None = None,
profile: str | None = None,
) -> int:
"""
Create Personas from names.
If --import is specified, copy the browser profile to the persona directory
and extract cookies.
Exit codes:
0: Success
1: Failure
"""
from archivebox.misc.jsonl import write_record
from archivebox.personas.models import Persona
is_tty = sys.stdout.isatty()
name_list = list(names) if names else []
if not name_list:
rprint("[yellow]No persona names provided. Pass names as arguments.[/yellow]", file=sys.stderr)
return 1
# Validate import source if specified
source_profile_dir = None
if import_from:
import_from = import_from.lower()
if import_from not in BROWSER_PROFILE_FINDERS:
rprint(f"[red]Unknown browser: {import_from}[/red]", file=sys.stderr)
rprint(f"[dim]Supported browsers: {', '.join(BROWSER_PROFILE_FINDERS.keys())}[/dim]", file=sys.stderr)
return 1
source_profile_dir = BROWSER_PROFILE_FINDERS[import_from]()
if not source_profile_dir:
rprint(f"[red]Could not find {import_from} profile directory[/red]", file=sys.stderr)
return 1
rprint(f"[dim]Found {import_from} profile: {source_profile_dir}[/dim]", file=sys.stderr)
if profile is None and (source_profile_dir / "Default").exists():
profile = "Default"
browser_binary = get_browser_binary(import_from)
if browser_binary:
rprint(f"[dim]Using {import_from} binary: {browser_binary}[/dim]", file=sys.stderr)
created_count = 0
for name in name_list:
name = name.strip()
if not name:
continue
# Validate persona name to prevent path traversal
is_valid, error_msg = persona_importers.validate_persona_name(name)
if not is_valid:
rprint(f'[red]Invalid persona name "{name}": {error_msg}[/red]', file=sys.stderr)
continue
persona, created = Persona.objects.get_or_create(name=name)
if created:
persona.ensure_dirs()
created_count += 1
rprint(f"[green]Created persona: {name}[/green]", file=sys.stderr)
else:
rprint(f"[dim]Persona already exists: {name}[/dim]", file=sys.stderr)
cookies_file = Path(persona.path) / "cookies.txt"
# Import browser profile if requested
if import_from in CHROMIUM_BROWSERS and source_profile_dir is not None:
try:
import_source = persona_importers.resolve_browser_import_source(import_from, profile_dir=profile)
import_result = persona_importers.import_persona_from_source(
persona,
import_source,
copy_profile=True,
import_cookies=True,
capture_storage=False,
)
except Exception as e:
rprint(f"[red]Failed to import browser profile: {e}[/red]", file=sys.stderr)
return 1
if import_result.profile_copied:
rprint("[green]Copied browser profile to persona[/green]", file=sys.stderr)
if import_result.cookies_imported:
rprint(f"[green]Extracted cookies to {cookies_file}[/green]", file=sys.stderr)
elif not import_result.profile_copied:
rprint("[yellow]Could not import cookies automatically.[/yellow]", file=sys.stderr)
for warning in import_result.warnings:
rprint(f"[yellow]{warning}[/yellow]", file=sys.stderr)
if not is_tty:
write_record(
{
"id": str(persona.id) if hasattr(persona, "id") else None,
"name": persona.name,
"path": str(persona.path),
"CHROME_USER_DATA_DIR": persona.CHROME_USER_DATA_DIR,
"COOKIES_FILE": persona.COOKIES_FILE,
},
)
rprint(f"[green]Created {created_count} new persona(s)[/green]", file=sys.stderr)
return 0
# =============================================================================
# LIST
# =============================================================================
def list_personas(
name: str | None = None,
name__icontains: str | None = None,
limit: int | None = None,
) -> int:
"""
List Personas as JSONL with optional filters.
Exit codes:
0: Success (even if no results)
"""
from archivebox.misc.jsonl import write_record
from archivebox.personas.models import Persona
is_tty = sys.stdout.isatty()
queryset = Persona.objects.all().order_by("name")
# Apply filters
filter_kwargs = {
"name": name,
"name__icontains": name__icontains,
}
queryset = apply_filters(queryset, filter_kwargs, limit=limit)
count = 0
for persona in queryset:
cookies_status = "[green]✓[/green]" if persona.COOKIES_FILE else "[dim]✗[/dim]"
chrome_status = "[green]✓[/green]" if Path(persona.CHROME_USER_DATA_DIR).exists() else "[dim]✗[/dim]"
if is_tty:
rprint(f"[cyan]{persona.name:20}[/cyan] cookies:{cookies_status} chrome:{chrome_status} [dim]{persona.path}[/dim]")
else:
write_record(
{
"id": str(persona.id) if hasattr(persona, "id") else None,
"name": persona.name,
"path": str(persona.path),
"CHROME_USER_DATA_DIR": persona.CHROME_USER_DATA_DIR,
"COOKIES_FILE": persona.COOKIES_FILE,
},
)
count += 1
rprint(f"[dim]Listed {count} persona(s)[/dim]", file=sys.stderr)
return 0
# =============================================================================
# UPDATE
# =============================================================================
def update_personas(name: str | None = None) -> int:
"""
Update Personas from stdin JSONL.
Reads Persona records from stdin and applies updates.
Uses PATCH semantics - only specified fields are updated.
Exit codes:
0: Success
1: No input or error
"""
from archivebox.misc.jsonl import read_stdin, write_record
from archivebox.personas.models import Persona
is_tty = sys.stdout.isatty()
records = list(read_stdin())
if not records:
rprint("[yellow]No records provided via stdin[/yellow]", file=sys.stderr)
return 1
updated_count = 0
for record in records:
persona_id = record.get("id")
old_name = record.get("name")
if not persona_id and not old_name:
continue
try:
if persona_id:
persona = Persona.objects.get(id=persona_id)
else:
persona = Persona.objects.get(name=old_name)
# Apply updates from CLI flags
if name:
# Validate new name to prevent path traversal
is_valid, error_msg = persona_importers.validate_persona_name(name)
if not is_valid:
rprint(f'[red]Invalid new persona name "{name}": {error_msg}[/red]', file=sys.stderr)
continue
# Rename the persona directory too
old_path = persona.path
persona.name = name
new_path = persona.path
if old_path.exists() and old_path != new_path:
shutil.move(str(old_path), str(new_path))
persona.save()
updated_count += 1
if not is_tty:
write_record(
{
"id": str(persona.id) if hasattr(persona, "id") else None,
"name": persona.name,
"path": str(persona.path),
},
)
except Persona.DoesNotExist:
rprint(f"[yellow]Persona not found: {persona_id or old_name}[/yellow]", file=sys.stderr)
continue
rprint(f"[green]Updated {updated_count} persona(s)[/green]", file=sys.stderr)
return 0
# =============================================================================
# DELETE
# =============================================================================
def delete_personas(yes: bool = False, dry_run: bool = False) -> int:
"""
Delete Personas from stdin JSONL.
Requires --yes flag to confirm deletion.
Exit codes:
0: Success
1: No input or missing --yes flag
"""
from archivebox.misc.jsonl import read_stdin
from archivebox.personas.models import Persona
records = list(read_stdin())
if not records:
rprint("[yellow]No records provided via stdin[/yellow]", file=sys.stderr)
return 1
# Collect persona IDs or names
persona_ids = []
persona_names = []
for r in records:
if r.get("id"):
persona_ids.append(r["id"])
elif r.get("name"):
persona_names.append(r["name"])
if not persona_ids and not persona_names:
rprint("[yellow]No valid persona IDs or names in input[/yellow]", file=sys.stderr)
return 1
from django.db.models import Q
query = Q()
if persona_ids:
query |= Q(id__in=persona_ids)
if persona_names:
query |= Q(name__in=persona_names)
personas = Persona.objects.filter(query)
count = personas.count()
if count == 0:
rprint("[yellow]No matching personas found[/yellow]", file=sys.stderr)
return 0
if dry_run:
rprint(f"[yellow]Would delete {count} persona(s) (dry run)[/yellow]", file=sys.stderr)
for persona in personas:
rprint(f" {persona.name} ({persona.path})", file=sys.stderr)
return 0
if not yes:
rprint("[red]Use --yes to confirm deletion[/red]", file=sys.stderr)
return 1
# Delete persona directories and database records
deleted_count = 0
for persona in personas:
persona_path = persona.path
# Safety check: ensure path is within PERSONAS_DIR before deletion
if not ensure_path_within_personas_dir(persona_path):
rprint(f'[red]Security error: persona path "{persona_path}" is outside PERSONAS_DIR. Skipping deletion.[/red]', file=sys.stderr)
continue
if persona_path.exists():
shutil.rmtree(persona_path)
persona.delete()
deleted_count += 1
rprint(f"[green]Deleted {deleted_count} persona(s)[/green]", file=sys.stderr)
return 0
# =============================================================================
# CLI Commands
# =============================================================================
@click.group()
def main():
"""Manage Persona records (browser profiles)."""
pass
@main.command("create")
@click.argument("names", nargs=-1)
@click.option("--import", "import_from", help="Import profile from browser (chrome, chromium, brave, edge)")
@click.option("--profile", help="Profile directory name under the user data dir (e.g. Default, Profile 1)")
def create_cmd(names: tuple, import_from: str | None, profile: str | None):
"""Create Personas, optionally importing from a browser profile."""
sys.exit(create_personas(names, import_from=import_from, profile=profile))
@main.command("list")
@click.option("--name", help="Filter by exact name")
@click.option("--name__icontains", help="Filter by name contains")
@click.option("--limit", "-n", type=int, help="Limit number of results")
def list_cmd(name: str | None, name__icontains: str | None, limit: int | None):
"""List Personas as JSONL."""
sys.exit(list_personas(name=name, name__icontains=name__icontains, limit=limit))
@main.command("update")
@click.option("--name", "-n", help="Set new name")
def update_cmd(name: str | None):
"""Update Personas from stdin JSONL."""
sys.exit(update_personas(name=name))
@main.command("delete")
@click.option("--yes", "-y", is_flag=True, help="Confirm deletion")
@click.option("--dry-run", is_flag=True, help="Show what would be deleted")
def delete_cmd(yes: bool, dry_run: bool):
"""Delete Personas from stdin JSONL."""
sys.exit(delete_personas(yes=yes, dry_run=dry_run))
if __name__ == "__main__":
main()