#!/usr/bin/env python3 """ archivebox persona [args...] [--filters] Manage Persona records (browser profiles for archiving). Actions: create - Create Personas list - List Personas as JSONL (with optional filters) update - Update Personas from stdin JSONL delete - Delete Personas from stdin JSONL Examples: # Create a new persona archivebox persona create work archivebox persona create --import=chrome personal archivebox persona create --import=edge work # List all personas archivebox persona list # Delete a persona archivebox persona list --name=old | archivebox persona delete --yes """ __package__ = "archivebox.cli" __command__ = "archivebox persona" import os import sys import shutil import platform import subprocess import tempfile import json from pathlib import Path from collections.abc import Iterable from collections import OrderedDict import rich_click as click from rich import print as rprint from archivebox.cli.cli_utils import apply_filters from archivebox.personas import importers as persona_importers # ============================================================================= # Browser Profile Locations # ============================================================================= def get_chrome_user_data_dir() -> Path | None: """Get the default Chrome user data directory for the current platform.""" system = platform.system() home = Path.home() if system == "Darwin": # macOS candidates = [ home / "Library" / "Application Support" / "Google" / "Chrome", home / "Library" / "Application Support" / "Chromium", ] elif system == "Linux": candidates = [ home / ".config" / "google-chrome", home / ".config" / "chromium", home / ".config" / "chrome", home / "snap" / "chromium" / "common" / "chromium", ] elif system == "Windows": local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local")) candidates = [ local_app_data / "Google" / "Chrome" / "User Data", local_app_data / "Chromium" / "User Data", ] else: candidates = [] for candidate in candidates: if candidate.exists() and (candidate / "Default").exists(): return candidate return None def get_brave_user_data_dir() -> Path | None: """Get the default Brave user data directory for the current platform.""" system = platform.system() home = Path.home() if system == "Darwin": candidates = [ home / "Library" / "Application Support" / "BraveSoftware" / "Brave-Browser", ] elif system == "Linux": candidates = [ home / ".config" / "BraveSoftware" / "Brave-Browser", ] elif system == "Windows": local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local")) candidates = [ local_app_data / "BraveSoftware" / "Brave-Browser" / "User Data", ] else: candidates = [] for candidate in candidates: if candidate.exists() and (candidate / "Default").exists(): return candidate return None def get_edge_user_data_dir() -> Path | None: """Get the default Edge user data directory for the current platform.""" system = platform.system() home = Path.home() if system == "Darwin": candidates = [ home / "Library" / "Application Support" / "Microsoft Edge", ] elif system == "Linux": candidates = [ home / ".config" / "microsoft-edge", home / ".config" / "microsoft-edge-beta", home / ".config" / "microsoft-edge-dev", ] elif system == "Windows": local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local")) candidates = [ local_app_data / "Microsoft" / "Edge" / "User Data", ] else: candidates = [] for candidate in candidates: if candidate.exists() and (candidate / "Default").exists(): return candidate return None def get_browser_binary(browser: str) -> str | None: system = platform.system() home = Path.home() browser = browser.lower() if system == "Darwin": candidates = { "chrome": ["/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"], "chromium": ["/Applications/Chromium.app/Contents/MacOS/Chromium"], "brave": ["/Applications/Brave Browser.app/Contents/MacOS/Brave Browser"], "edge": ["/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge"], }.get(browser, []) elif system == "Linux": candidates = { "chrome": [ "/usr/bin/google-chrome", "/usr/bin/google-chrome-stable", "/usr/bin/google-chrome-beta", "/usr/bin/google-chrome-unstable", ], "chromium": ["/usr/bin/chromium", "/usr/bin/chromium-browser"], "brave": ["/usr/bin/brave-browser", "/usr/bin/brave-browser-beta", "/usr/bin/brave-browser-nightly"], "edge": [ "/usr/bin/microsoft-edge", "/usr/bin/microsoft-edge-stable", "/usr/bin/microsoft-edge-beta", "/usr/bin/microsoft-edge-dev", ], }.get(browser, []) elif system == "Windows": local_app_data = Path(os.environ.get("LOCALAPPDATA", home / "AppData" / "Local")) candidates = { "chrome": [ str(local_app_data / "Google" / "Chrome" / "Application" / "chrome.exe"), "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe", "C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe", ], "chromium": [str(local_app_data / "Chromium" / "Application" / "chrome.exe")], "brave": [ str(local_app_data / "BraveSoftware" / "Brave-Browser" / "Application" / "brave.exe"), "C:\\Program Files\\BraveSoftware\\Brave-Browser\\Application\\brave.exe", "C:\\Program Files (x86)\\BraveSoftware\\Brave-Browser\\Application\\brave.exe", ], "edge": [ str(local_app_data / "Microsoft" / "Edge" / "Application" / "msedge.exe"), "C:\\Program Files\\Microsoft\\Edge\\Application\\msedge.exe", "C:\\Program Files (x86)\\Microsoft\\Edge\\Application\\msedge.exe", ], }.get(browser, []) else: candidates = [] for candidate in candidates: if candidate and Path(candidate).exists(): return candidate return None BROWSER_PROFILE_FINDERS = { "chrome": get_chrome_user_data_dir, "chromium": get_chrome_user_data_dir, # Same locations "brave": get_brave_user_data_dir, "edge": get_edge_user_data_dir, } CHROMIUM_BROWSERS = {"chrome", "chromium", "brave", "edge"} # ============================================================================= # Cookie Extraction via CDP # ============================================================================= NETSCAPE_COOKIE_HEADER = [ "# Netscape HTTP Cookie File", "# https://curl.se/docs/http-cookies.html", "# This file was generated by ArchiveBox persona cookie extraction", "#", "# Format: domain\\tincludeSubdomains\\tpath\\tsecure\\texpiry\\tname\\tvalue", "", ] def _parse_netscape_cookies(path: Path) -> "OrderedDict[tuple[str, str, str], tuple[str, str, str, str, str, str, str]]": cookies = OrderedDict() if not path.exists(): return cookies for line in path.read_text().splitlines(): if not line or line.startswith("#"): continue parts = line.split("\t") if len(parts) < 7: continue domain, include_subdomains, cookie_path, secure, expiry, name, value = parts[:7] key = (domain, cookie_path, name) cookies[key] = (domain, include_subdomains, cookie_path, secure, expiry, name, value) return cookies def _write_netscape_cookies(path: Path, cookies: "OrderedDict[tuple[str, str, str], tuple[str, str, str, str, str, str, str]]") -> None: lines = list(NETSCAPE_COOKIE_HEADER) for cookie in cookies.values(): lines.append("\t".join(cookie)) path.write_text("\n".join(lines) + "\n") def _merge_netscape_cookies(existing_file: Path, new_file: Path) -> None: existing = _parse_netscape_cookies(existing_file) new = _parse_netscape_cookies(new_file) for key, cookie in new.items(): existing[key] = cookie _write_netscape_cookies(existing_file, existing) def extract_cookies_via_cdp( user_data_dir: Path, output_file: Path, profile_dir: str | None = None, chrome_binary: str | None = None, ) -> bool: """ Launch Chrome with the given user data dir and extract cookies via CDP. Returns True if successful, False otherwise. """ from archivebox.config.common import STORAGE_CONFIG # Find the cookie extraction script chrome_plugin_dir = Path(__file__).parent.parent / "plugins" / "chrome" extract_script = chrome_plugin_dir / "extract_cookies.js" if not extract_script.exists(): rprint(f"[yellow]Cookie extraction script not found at {extract_script}[/yellow]", file=sys.stderr) return False # Get node modules dir node_modules_dir = STORAGE_CONFIG.LIB_DIR / "npm" / "node_modules" # Set up environment env = os.environ.copy() env["NODE_MODULES_DIR"] = str(node_modules_dir) env["CHROME_USER_DATA_DIR"] = str(user_data_dir) env["CHROME_HEADLESS"] = "true" if chrome_binary: env["CHROME_BINARY"] = str(chrome_binary) output_path = output_file temp_output = None temp_dir = None if output_file.exists(): temp_dir = Path(tempfile.mkdtemp(prefix="ab_cookies_")) temp_output = temp_dir / "cookies.txt" output_path = temp_output if profile_dir: extra_arg = f"--profile-directory={profile_dir}" existing_extra = env.get("CHROME_ARGS_EXTRA", "").strip() args_list = [] if existing_extra: if existing_extra.startswith("["): try: parsed = json.loads(existing_extra) if isinstance(parsed, list): args_list.extend(str(x) for x in parsed) except Exception: args_list.extend([s.strip() for s in existing_extra.split(",") if s.strip()]) else: args_list.extend([s.strip() for s in existing_extra.split(",") if s.strip()]) args_list.append(extra_arg) env["CHROME_ARGS_EXTRA"] = json.dumps(args_list) env["COOKIES_OUTPUT_FILE"] = str(output_path) try: result = subprocess.run( ["node", str(extract_script)], env=env, capture_output=True, text=True, timeout=60, ) if result.returncode == 0: if temp_output and temp_output.exists(): _merge_netscape_cookies(output_file, temp_output) return True else: rprint(f"[yellow]Cookie extraction failed: {result.stderr}[/yellow]", file=sys.stderr) return False except subprocess.TimeoutExpired: rprint("[yellow]Cookie extraction timed out[/yellow]", file=sys.stderr) return False except FileNotFoundError: rprint("[yellow]Node.js not found. Cannot extract cookies.[/yellow]", file=sys.stderr) return False except Exception as e: rprint(f"[yellow]Cookie extraction error: {e}[/yellow]", file=sys.stderr) return False finally: if temp_dir and temp_dir.exists(): shutil.rmtree(temp_dir, ignore_errors=True) # ============================================================================= # Validation Helpers # ============================================================================= def validate_persona_name(name: str) -> tuple[bool, str]: """ Validate persona name to prevent path traversal attacks. Returns: (is_valid, error_message): tuple indicating if name is valid """ if not name or not name.strip(): return False, "Persona name cannot be empty" # Check for path separators if "/" in name or "\\" in name: return False, "Persona name cannot contain path separators (/ or \\)" # Check for parent directory references if ".." in name: return False, "Persona name cannot contain parent directory references (..)" # Check for hidden files/directories if name.startswith("."): return False, "Persona name cannot start with a dot (.)" # Ensure name doesn't contain null bytes or other dangerous chars if "\x00" in name or "\n" in name or "\r" in name: return False, "Persona name contains invalid characters" return True, "" def ensure_path_within_personas_dir(persona_path: Path) -> bool: """ Verify that a persona path is within PERSONAS_DIR. This is a safety check to prevent path traversal attacks where a malicious persona name could cause operations on paths outside the expected PERSONAS_DIR. Returns: True if path is safe, False otherwise """ from archivebox.config.constants import CONSTANTS try: # Resolve both paths to absolute paths personas_dir = CONSTANTS.PERSONAS_DIR.resolve() resolved_path = persona_path.resolve() # Check if resolved_path is a child of personas_dir return resolved_path.is_relative_to(personas_dir) except (ValueError, RuntimeError): return False # ============================================================================= # CREATE # ============================================================================= def create_personas( names: Iterable[str], import_from: str | None = None, profile: str | None = None, ) -> int: """ Create Personas from names. If --import is specified, copy the browser profile to the persona directory and extract cookies. Exit codes: 0: Success 1: Failure """ from archivebox.misc.jsonl import write_record from archivebox.personas.models import Persona is_tty = sys.stdout.isatty() name_list = list(names) if names else [] if not name_list: rprint("[yellow]No persona names provided. Pass names as arguments.[/yellow]", file=sys.stderr) return 1 # Validate import source if specified source_profile_dir = None if import_from: import_from = import_from.lower() if import_from not in BROWSER_PROFILE_FINDERS: rprint(f"[red]Unknown browser: {import_from}[/red]", file=sys.stderr) rprint(f"[dim]Supported browsers: {', '.join(BROWSER_PROFILE_FINDERS.keys())}[/dim]", file=sys.stderr) return 1 source_profile_dir = BROWSER_PROFILE_FINDERS[import_from]() if not source_profile_dir: rprint(f"[red]Could not find {import_from} profile directory[/red]", file=sys.stderr) return 1 rprint(f"[dim]Found {import_from} profile: {source_profile_dir}[/dim]", file=sys.stderr) if profile is None and (source_profile_dir / "Default").exists(): profile = "Default" browser_binary = get_browser_binary(import_from) if browser_binary: rprint(f"[dim]Using {import_from} binary: {browser_binary}[/dim]", file=sys.stderr) created_count = 0 for name in name_list: name = name.strip() if not name: continue # Validate persona name to prevent path traversal is_valid, error_msg = persona_importers.validate_persona_name(name) if not is_valid: rprint(f'[red]Invalid persona name "{name}": {error_msg}[/red]', file=sys.stderr) continue persona, created = Persona.objects.get_or_create(name=name) if created: persona.ensure_dirs() created_count += 1 rprint(f"[green]Created persona: {name}[/green]", file=sys.stderr) else: rprint(f"[dim]Persona already exists: {name}[/dim]", file=sys.stderr) cookies_file = Path(persona.path) / "cookies.txt" # Import browser profile if requested if import_from in CHROMIUM_BROWSERS and source_profile_dir is not None: try: import_source = persona_importers.resolve_browser_import_source(import_from, profile_dir=profile) import_result = persona_importers.import_persona_from_source( persona, import_source, copy_profile=True, import_cookies=True, capture_storage=False, ) except Exception as e: rprint(f"[red]Failed to import browser profile: {e}[/red]", file=sys.stderr) return 1 if import_result.profile_copied: rprint("[green]Copied browser profile to persona[/green]", file=sys.stderr) if import_result.cookies_imported: rprint(f"[green]Extracted cookies to {cookies_file}[/green]", file=sys.stderr) elif not import_result.profile_copied: rprint("[yellow]Could not import cookies automatically.[/yellow]", file=sys.stderr) for warning in import_result.warnings: rprint(f"[yellow]{warning}[/yellow]", file=sys.stderr) if not is_tty: write_record( { "id": str(persona.id) if hasattr(persona, "id") else None, "name": persona.name, "path": str(persona.path), "CHROME_USER_DATA_DIR": persona.CHROME_USER_DATA_DIR, "COOKIES_FILE": persona.COOKIES_FILE, }, ) rprint(f"[green]Created {created_count} new persona(s)[/green]", file=sys.stderr) return 0 # ============================================================================= # LIST # ============================================================================= def list_personas( name: str | None = None, name__icontains: str | None = None, limit: int | None = None, ) -> int: """ List Personas as JSONL with optional filters. Exit codes: 0: Success (even if no results) """ from archivebox.misc.jsonl import write_record from archivebox.personas.models import Persona is_tty = sys.stdout.isatty() queryset = Persona.objects.all().order_by("name") # Apply filters filter_kwargs = { "name": name, "name__icontains": name__icontains, } queryset = apply_filters(queryset, filter_kwargs, limit=limit) count = 0 for persona in queryset: cookies_status = "[green]✓[/green]" if persona.COOKIES_FILE else "[dim]✗[/dim]" chrome_status = "[green]✓[/green]" if Path(persona.CHROME_USER_DATA_DIR).exists() else "[dim]✗[/dim]" if is_tty: rprint(f"[cyan]{persona.name:20}[/cyan] cookies:{cookies_status} chrome:{chrome_status} [dim]{persona.path}[/dim]") else: write_record( { "id": str(persona.id) if hasattr(persona, "id") else None, "name": persona.name, "path": str(persona.path), "CHROME_USER_DATA_DIR": persona.CHROME_USER_DATA_DIR, "COOKIES_FILE": persona.COOKIES_FILE, }, ) count += 1 rprint(f"[dim]Listed {count} persona(s)[/dim]", file=sys.stderr) return 0 # ============================================================================= # UPDATE # ============================================================================= def update_personas(name: str | None = None) -> int: """ Update Personas from stdin JSONL. Reads Persona records from stdin and applies updates. Uses PATCH semantics - only specified fields are updated. Exit codes: 0: Success 1: No input or error """ from archivebox.misc.jsonl import read_stdin, write_record from archivebox.personas.models import Persona is_tty = sys.stdout.isatty() records = list(read_stdin()) if not records: rprint("[yellow]No records provided via stdin[/yellow]", file=sys.stderr) return 1 updated_count = 0 for record in records: persona_id = record.get("id") old_name = record.get("name") if not persona_id and not old_name: continue try: if persona_id: persona = Persona.objects.get(id=persona_id) else: persona = Persona.objects.get(name=old_name) # Apply updates from CLI flags if name: # Validate new name to prevent path traversal is_valid, error_msg = persona_importers.validate_persona_name(name) if not is_valid: rprint(f'[red]Invalid new persona name "{name}": {error_msg}[/red]', file=sys.stderr) continue # Rename the persona directory too old_path = persona.path persona.name = name new_path = persona.path if old_path.exists() and old_path != new_path: shutil.move(str(old_path), str(new_path)) persona.save() updated_count += 1 if not is_tty: write_record( { "id": str(persona.id) if hasattr(persona, "id") else None, "name": persona.name, "path": str(persona.path), }, ) except Persona.DoesNotExist: rprint(f"[yellow]Persona not found: {persona_id or old_name}[/yellow]", file=sys.stderr) continue rprint(f"[green]Updated {updated_count} persona(s)[/green]", file=sys.stderr) return 0 # ============================================================================= # DELETE # ============================================================================= def delete_personas(yes: bool = False, dry_run: bool = False) -> int: """ Delete Personas from stdin JSONL. Requires --yes flag to confirm deletion. Exit codes: 0: Success 1: No input or missing --yes flag """ from archivebox.misc.jsonl import read_stdin from archivebox.personas.models import Persona records = list(read_stdin()) if not records: rprint("[yellow]No records provided via stdin[/yellow]", file=sys.stderr) return 1 # Collect persona IDs or names persona_ids = [] persona_names = [] for r in records: if r.get("id"): persona_ids.append(r["id"]) elif r.get("name"): persona_names.append(r["name"]) if not persona_ids and not persona_names: rprint("[yellow]No valid persona IDs or names in input[/yellow]", file=sys.stderr) return 1 from django.db.models import Q query = Q() if persona_ids: query |= Q(id__in=persona_ids) if persona_names: query |= Q(name__in=persona_names) personas = Persona.objects.filter(query) count = personas.count() if count == 0: rprint("[yellow]No matching personas found[/yellow]", file=sys.stderr) return 0 if dry_run: rprint(f"[yellow]Would delete {count} persona(s) (dry run)[/yellow]", file=sys.stderr) for persona in personas: rprint(f" {persona.name} ({persona.path})", file=sys.stderr) return 0 if not yes: rprint("[red]Use --yes to confirm deletion[/red]", file=sys.stderr) return 1 # Delete persona directories and database records deleted_count = 0 for persona in personas: persona_path = persona.path # Safety check: ensure path is within PERSONAS_DIR before deletion if not ensure_path_within_personas_dir(persona_path): rprint(f'[red]Security error: persona path "{persona_path}" is outside PERSONAS_DIR. Skipping deletion.[/red]', file=sys.stderr) continue if persona_path.exists(): shutil.rmtree(persona_path) persona.delete() deleted_count += 1 rprint(f"[green]Deleted {deleted_count} persona(s)[/green]", file=sys.stderr) return 0 # ============================================================================= # CLI Commands # ============================================================================= @click.group() def main(): """Manage Persona records (browser profiles).""" pass @main.command("create") @click.argument("names", nargs=-1) @click.option("--import", "import_from", help="Import profile from browser (chrome, chromium, brave, edge)") @click.option("--profile", help="Profile directory name under the user data dir (e.g. Default, Profile 1)") def create_cmd(names: tuple, import_from: str | None, profile: str | None): """Create Personas, optionally importing from a browser profile.""" sys.exit(create_personas(names, import_from=import_from, profile=profile)) @main.command("list") @click.option("--name", help="Filter by exact name") @click.option("--name__icontains", help="Filter by name contains") @click.option("--limit", "-n", type=int, help="Limit number of results") def list_cmd(name: str | None, name__icontains: str | None, limit: int | None): """List Personas as JSONL.""" sys.exit(list_personas(name=name, name__icontains=name__icontains, limit=limit)) @main.command("update") @click.option("--name", "-n", help="Set new name") def update_cmd(name: str | None): """Update Personas from stdin JSONL.""" sys.exit(update_personas(name=name)) @main.command("delete") @click.option("--yes", "-y", is_flag=True, help="Confirm deletion") @click.option("--dry-run", is_flag=True, help="Show what would be deleted") def delete_cmd(yes: bool, dry_run: bool): """Delete Personas from stdin JSONL.""" sys.exit(delete_personas(yes=yes, dry_run=dry_run)) if __name__ == "__main__": main()