Files
ArchiveBox/archivebox/cli/archivebox_persona.py
Nick Sweeting 934e02695b fix lint
2026-03-15 18:45:29 -07:00

775 lines
26 KiB
Python

#!/usr/bin/env python3
"""
archivebox persona <action> [args...] [--filters]
Manage Persona records (browser profiles for archiving).
Actions:
create - Create Personas
list - List Personas as JSONL (with optional filters)
update - Update Personas from stdin JSONL
delete - Delete Personas from stdin JSONL
Examples:
# Create a new persona
archivebox persona create work
archivebox persona create --import=chrome personal
archivebox persona create --import=edge work
# List all personas
archivebox persona list
# Delete a persona
archivebox persona list --name=old | archivebox persona delete --yes
"""
__package__ = 'archivebox.cli'
__command__ = 'archivebox persona'
import os
import sys
import shutil
import platform
import subprocess
import tempfile
import json
from pathlib import Path
from typing import Optional, Iterable
from collections import OrderedDict
import rich_click as click
from rich import print as rprint
from archivebox.cli.cli_utils import apply_filters
# =============================================================================
# Browser Profile Locations
# =============================================================================
def get_chrome_user_data_dir() -> Optional[Path]:
"""Get the default Chrome user data directory for the current platform."""
system = platform.system()
home = Path.home()
if system == 'Darwin': # macOS
candidates = [
home / 'Library' / 'Application Support' / 'Google' / 'Chrome',
home / 'Library' / 'Application Support' / 'Chromium',
]
elif system == 'Linux':
candidates = [
home / '.config' / 'google-chrome',
home / '.config' / 'chromium',
home / '.config' / 'chrome',
home / 'snap' / 'chromium' / 'common' / 'chromium',
]
elif system == 'Windows':
local_app_data = Path(os.environ.get('LOCALAPPDATA', home / 'AppData' / 'Local'))
candidates = [
local_app_data / 'Google' / 'Chrome' / 'User Data',
local_app_data / 'Chromium' / 'User Data',
]
else:
candidates = []
for candidate in candidates:
if candidate.exists() and (candidate / 'Default').exists():
return candidate
return None
def get_brave_user_data_dir() -> Optional[Path]:
"""Get the default Brave user data directory for the current platform."""
system = platform.system()
home = Path.home()
if system == 'Darwin':
candidates = [
home / 'Library' / 'Application Support' / 'BraveSoftware' / 'Brave-Browser',
]
elif system == 'Linux':
candidates = [
home / '.config' / 'BraveSoftware' / 'Brave-Browser',
]
elif system == 'Windows':
local_app_data = Path(os.environ.get('LOCALAPPDATA', home / 'AppData' / 'Local'))
candidates = [
local_app_data / 'BraveSoftware' / 'Brave-Browser' / 'User Data',
]
else:
candidates = []
for candidate in candidates:
if candidate.exists() and (candidate / 'Default').exists():
return candidate
return None
def get_edge_user_data_dir() -> Optional[Path]:
"""Get the default Edge user data directory for the current platform."""
system = platform.system()
home = Path.home()
if system == 'Darwin':
candidates = [
home / 'Library' / 'Application Support' / 'Microsoft Edge',
]
elif system == 'Linux':
candidates = [
home / '.config' / 'microsoft-edge',
home / '.config' / 'microsoft-edge-beta',
home / '.config' / 'microsoft-edge-dev',
]
elif system == 'Windows':
local_app_data = Path(os.environ.get('LOCALAPPDATA', home / 'AppData' / 'Local'))
candidates = [
local_app_data / 'Microsoft' / 'Edge' / 'User Data',
]
else:
candidates = []
for candidate in candidates:
if candidate.exists() and (candidate / 'Default').exists():
return candidate
return None
def get_browser_binary(browser: str) -> Optional[str]:
system = platform.system()
home = Path.home()
browser = browser.lower()
if system == 'Darwin':
candidates = {
'chrome': ['/Applications/Google Chrome.app/Contents/MacOS/Google Chrome'],
'chromium': ['/Applications/Chromium.app/Contents/MacOS/Chromium'],
'brave': ['/Applications/Brave Browser.app/Contents/MacOS/Brave Browser'],
'edge': ['/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge'],
}.get(browser, [])
elif system == 'Linux':
candidates = {
'chrome': ['/usr/bin/google-chrome', '/usr/bin/google-chrome-stable', '/usr/bin/google-chrome-beta', '/usr/bin/google-chrome-unstable'],
'chromium': ['/usr/bin/chromium', '/usr/bin/chromium-browser'],
'brave': ['/usr/bin/brave-browser', '/usr/bin/brave-browser-beta', '/usr/bin/brave-browser-nightly'],
'edge': ['/usr/bin/microsoft-edge', '/usr/bin/microsoft-edge-stable', '/usr/bin/microsoft-edge-beta', '/usr/bin/microsoft-edge-dev'],
}.get(browser, [])
elif system == 'Windows':
local_app_data = Path(os.environ.get('LOCALAPPDATA', home / 'AppData' / 'Local'))
candidates = {
'chrome': [
str(local_app_data / 'Google' / 'Chrome' / 'Application' / 'chrome.exe'),
'C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe',
'C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe',
],
'chromium': [str(local_app_data / 'Chromium' / 'Application' / 'chrome.exe')],
'brave': [
str(local_app_data / 'BraveSoftware' / 'Brave-Browser' / 'Application' / 'brave.exe'),
'C:\\Program Files\\BraveSoftware\\Brave-Browser\\Application\\brave.exe',
'C:\\Program Files (x86)\\BraveSoftware\\Brave-Browser\\Application\\brave.exe',
],
'edge': [
str(local_app_data / 'Microsoft' / 'Edge' / 'Application' / 'msedge.exe'),
'C:\\Program Files\\Microsoft\\Edge\\Application\\msedge.exe',
'C:\\Program Files (x86)\\Microsoft\\Edge\\Application\\msedge.exe',
],
}.get(browser, [])
else:
candidates = []
for candidate in candidates:
if candidate and Path(candidate).exists():
return candidate
return None
BROWSER_PROFILE_FINDERS = {
'chrome': get_chrome_user_data_dir,
'chromium': get_chrome_user_data_dir, # Same locations
'brave': get_brave_user_data_dir,
'edge': get_edge_user_data_dir,
}
CHROMIUM_BROWSERS = {'chrome', 'chromium', 'brave', 'edge'}
# =============================================================================
# Cookie Extraction via CDP
# =============================================================================
NETSCAPE_COOKIE_HEADER = [
'# Netscape HTTP Cookie File',
'# https://curl.se/docs/http-cookies.html',
'# This file was generated by ArchiveBox persona cookie extraction',
'#',
'# Format: domain\\tincludeSubdomains\\tpath\\tsecure\\texpiry\\tname\\tvalue',
'',
]
def _parse_netscape_cookies(path: Path) -> "OrderedDict[tuple[str, str, str], tuple[str, str, str, str, str, str, str]]":
cookies = OrderedDict()
if not path.exists():
return cookies
for line in path.read_text().splitlines():
if not line or line.startswith('#'):
continue
parts = line.split('\t')
if len(parts) < 7:
continue
domain, include_subdomains, cookie_path, secure, expiry, name, value = parts[:7]
key = (domain, cookie_path, name)
cookies[key] = (domain, include_subdomains, cookie_path, secure, expiry, name, value)
return cookies
def _write_netscape_cookies(path: Path, cookies: "OrderedDict[tuple[str, str, str], tuple[str, str, str, str, str, str, str]]") -> None:
lines = list(NETSCAPE_COOKIE_HEADER)
for cookie in cookies.values():
lines.append('\t'.join(cookie))
path.write_text('\n'.join(lines) + '\n')
def _merge_netscape_cookies(existing_file: Path, new_file: Path) -> None:
existing = _parse_netscape_cookies(existing_file)
new = _parse_netscape_cookies(new_file)
for key, cookie in new.items():
existing[key] = cookie
_write_netscape_cookies(existing_file, existing)
def extract_cookies_via_cdp(
user_data_dir: Path,
output_file: Path,
profile_dir: str | None = None,
chrome_binary: str | None = None,
) -> bool:
"""
Launch Chrome with the given user data dir and extract cookies via CDP.
Returns True if successful, False otherwise.
"""
from archivebox.config.common import STORAGE_CONFIG
# Find the cookie extraction script
chrome_plugin_dir = Path(__file__).parent.parent / 'plugins' / 'chrome'
extract_script = chrome_plugin_dir / 'extract_cookies.js'
if not extract_script.exists():
rprint(f'[yellow]Cookie extraction script not found at {extract_script}[/yellow]', file=sys.stderr)
return False
# Get node modules dir
node_modules_dir = STORAGE_CONFIG.LIB_DIR / 'npm' / 'node_modules'
# Set up environment
env = os.environ.copy()
env['NODE_MODULES_DIR'] = str(node_modules_dir)
env['CHROME_USER_DATA_DIR'] = str(user_data_dir)
env['CHROME_HEADLESS'] = 'true'
if chrome_binary:
env['CHROME_BINARY'] = str(chrome_binary)
output_path = output_file
temp_output = None
temp_dir = None
if output_file.exists():
temp_dir = Path(tempfile.mkdtemp(prefix='ab_cookies_'))
temp_output = temp_dir / 'cookies.txt'
output_path = temp_output
if profile_dir:
extra_arg = f'--profile-directory={profile_dir}'
existing_extra = env.get('CHROME_ARGS_EXTRA', '').strip()
args_list = []
if existing_extra:
if existing_extra.startswith('['):
try:
parsed = json.loads(existing_extra)
if isinstance(parsed, list):
args_list.extend(str(x) for x in parsed)
except Exception:
args_list.extend([s.strip() for s in existing_extra.split(',') if s.strip()])
else:
args_list.extend([s.strip() for s in existing_extra.split(',') if s.strip()])
args_list.append(extra_arg)
env['CHROME_ARGS_EXTRA'] = json.dumps(args_list)
env['COOKIES_OUTPUT_FILE'] = str(output_path)
try:
result = subprocess.run(
['node', str(extract_script)],
env=env,
capture_output=True,
text=True,
timeout=60,
)
if result.returncode == 0:
if temp_output and temp_output.exists():
_merge_netscape_cookies(output_file, temp_output)
return True
else:
rprint(f'[yellow]Cookie extraction failed: {result.stderr}[/yellow]', file=sys.stderr)
return False
except subprocess.TimeoutExpired:
rprint('[yellow]Cookie extraction timed out[/yellow]', file=sys.stderr)
return False
except FileNotFoundError:
rprint('[yellow]Node.js not found. Cannot extract cookies.[/yellow]', file=sys.stderr)
return False
except Exception as e:
rprint(f'[yellow]Cookie extraction error: {e}[/yellow]', file=sys.stderr)
return False
finally:
if temp_dir and temp_dir.exists():
shutil.rmtree(temp_dir, ignore_errors=True)
# =============================================================================
# Validation Helpers
# =============================================================================
def validate_persona_name(name: str) -> tuple[bool, str]:
"""
Validate persona name to prevent path traversal attacks.
Returns:
(is_valid, error_message): tuple indicating if name is valid
"""
if not name or not name.strip():
return False, "Persona name cannot be empty"
# Check for path separators
if '/' in name or '\\' in name:
return False, "Persona name cannot contain path separators (/ or \\)"
# Check for parent directory references
if '..' in name:
return False, "Persona name cannot contain parent directory references (..)"
# Check for hidden files/directories
if name.startswith('.'):
return False, "Persona name cannot start with a dot (.)"
# Ensure name doesn't contain null bytes or other dangerous chars
if '\x00' in name or '\n' in name or '\r' in name:
return False, "Persona name contains invalid characters"
return True, ""
def ensure_path_within_personas_dir(persona_path: Path) -> bool:
"""
Verify that a persona path is within PERSONAS_DIR.
This is a safety check to prevent path traversal attacks where
a malicious persona name could cause operations on paths outside
the expected PERSONAS_DIR.
Returns:
True if path is safe, False otherwise
"""
from archivebox.config.constants import CONSTANTS
try:
# Resolve both paths to absolute paths
personas_dir = CONSTANTS.PERSONAS_DIR.resolve()
resolved_path = persona_path.resolve()
# Check if resolved_path is a child of personas_dir
return resolved_path.is_relative_to(personas_dir)
except (ValueError, RuntimeError):
return False
# =============================================================================
# CREATE
# =============================================================================
def create_personas(
names: Iterable[str],
import_from: Optional[str] = None,
profile: Optional[str] = None,
) -> int:
"""
Create Personas from names.
If --import is specified, copy the browser profile to the persona directory
and extract cookies.
Exit codes:
0: Success
1: Failure
"""
from archivebox.misc.jsonl import write_record
from archivebox.personas.models import Persona
is_tty = sys.stdout.isatty()
name_list = list(names) if names else []
if not name_list:
rprint('[yellow]No persona names provided. Pass names as arguments.[/yellow]', file=sys.stderr)
return 1
# Validate import source if specified
source_profile_dir = None
if import_from:
import_from = import_from.lower()
if import_from not in BROWSER_PROFILE_FINDERS:
rprint(f'[red]Unknown browser: {import_from}[/red]', file=sys.stderr)
rprint(f'[dim]Supported browsers: {", ".join(BROWSER_PROFILE_FINDERS.keys())}[/dim]', file=sys.stderr)
return 1
source_profile_dir = BROWSER_PROFILE_FINDERS[import_from]()
if not source_profile_dir:
rprint(f'[red]Could not find {import_from} profile directory[/red]', file=sys.stderr)
return 1
rprint(f'[dim]Found {import_from} profile: {source_profile_dir}[/dim]', file=sys.stderr)
if profile is None and (source_profile_dir / 'Default').exists():
profile = 'Default'
browser_binary = get_browser_binary(import_from)
if browser_binary:
rprint(f'[dim]Using {import_from} binary: {browser_binary}[/dim]', file=sys.stderr)
else:
browser_binary = None
created_count = 0
for name in name_list:
name = name.strip()
if not name:
continue
# Validate persona name to prevent path traversal
is_valid, error_msg = validate_persona_name(name)
if not is_valid:
rprint(f'[red]Invalid persona name "{name}": {error_msg}[/red]', file=sys.stderr)
continue
persona, created = Persona.objects.get_or_create(name=name)
if created:
persona.ensure_dirs()
created_count += 1
rprint(f'[green]Created persona: {name}[/green]', file=sys.stderr)
else:
rprint(f'[dim]Persona already exists: {name}[/dim]', file=sys.stderr)
# Import browser profile if requested
if import_from and source_profile_dir:
cookies_file = Path(persona.path) / 'cookies.txt'
if import_from in CHROMIUM_BROWSERS:
persona_chrome_dir = Path(persona.CHROME_USER_DATA_DIR)
# Copy the browser profile
rprint(f'[dim]Copying browser profile to {persona_chrome_dir}...[/dim]', file=sys.stderr)
try:
# Remove existing chrome_user_data if it exists
if persona_chrome_dir.exists():
shutil.rmtree(persona_chrome_dir)
# Copy the profile directory
# We copy the entire user data dir, not just Default profile
shutil.copytree(
source_profile_dir,
persona_chrome_dir,
symlinks=True,
ignore=shutil.ignore_patterns(
'Cache', 'Code Cache', 'GPUCache', 'ShaderCache',
'Service Worker', 'GCM Store', '*.log', 'Crashpad',
'BrowserMetrics', 'BrowserMetrics-spare.pma',
'SingletonLock', 'SingletonSocket', 'SingletonCookie',
),
)
rprint('[green]Copied browser profile to persona[/green]', file=sys.stderr)
# Extract cookies via CDP
rprint('[dim]Extracting cookies via CDP...[/dim]', file=sys.stderr)
if extract_cookies_via_cdp(
persona_chrome_dir,
cookies_file,
profile_dir=profile,
chrome_binary=browser_binary,
):
rprint(f'[green]Extracted cookies to {cookies_file}[/green]', file=sys.stderr)
else:
rprint('[yellow]Could not extract cookies automatically.[/yellow]', file=sys.stderr)
rprint('[dim]You can manually export cookies using a browser extension.[/dim]', file=sys.stderr)
except Exception as e:
rprint(f'[red]Failed to copy browser profile: {e}[/red]', file=sys.stderr)
return 1
if not is_tty:
write_record({
'id': str(persona.id) if hasattr(persona, 'id') else None,
'name': persona.name,
'path': str(persona.path),
'CHROME_USER_DATA_DIR': persona.CHROME_USER_DATA_DIR,
'COOKIES_FILE': persona.COOKIES_FILE,
})
rprint(f'[green]Created {created_count} new persona(s)[/green]', file=sys.stderr)
return 0
# =============================================================================
# LIST
# =============================================================================
def list_personas(
name: Optional[str] = None,
name__icontains: Optional[str] = None,
limit: Optional[int] = None,
) -> int:
"""
List Personas as JSONL with optional filters.
Exit codes:
0: Success (even if no results)
"""
from archivebox.misc.jsonl import write_record
from archivebox.personas.models import Persona
is_tty = sys.stdout.isatty()
queryset = Persona.objects.all().order_by('name')
# Apply filters
filter_kwargs = {
'name': name,
'name__icontains': name__icontains,
}
queryset = apply_filters(queryset, filter_kwargs, limit=limit)
count = 0
for persona in queryset:
cookies_status = '[green]✓[/green]' if persona.COOKIES_FILE else '[dim]✗[/dim]'
chrome_status = '[green]✓[/green]' if Path(persona.CHROME_USER_DATA_DIR).exists() else '[dim]✗[/dim]'
if is_tty:
rprint(f'[cyan]{persona.name:20}[/cyan] cookies:{cookies_status} chrome:{chrome_status} [dim]{persona.path}[/dim]')
else:
write_record({
'id': str(persona.id) if hasattr(persona, 'id') else None,
'name': persona.name,
'path': str(persona.path),
'CHROME_USER_DATA_DIR': persona.CHROME_USER_DATA_DIR,
'COOKIES_FILE': persona.COOKIES_FILE,
})
count += 1
rprint(f'[dim]Listed {count} persona(s)[/dim]', file=sys.stderr)
return 0
# =============================================================================
# UPDATE
# =============================================================================
def update_personas(name: Optional[str] = None) -> int:
"""
Update Personas from stdin JSONL.
Reads Persona records from stdin and applies updates.
Uses PATCH semantics - only specified fields are updated.
Exit codes:
0: Success
1: No input or error
"""
from archivebox.misc.jsonl import read_stdin, write_record
from archivebox.personas.models import Persona
is_tty = sys.stdout.isatty()
records = list(read_stdin())
if not records:
rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
return 1
updated_count = 0
for record in records:
persona_id = record.get('id')
old_name = record.get('name')
if not persona_id and not old_name:
continue
try:
if persona_id:
persona = Persona.objects.get(id=persona_id)
else:
persona = Persona.objects.get(name=old_name)
# Apply updates from CLI flags
if name:
# Validate new name to prevent path traversal
is_valid, error_msg = validate_persona_name(name)
if not is_valid:
rprint(f'[red]Invalid new persona name "{name}": {error_msg}[/red]', file=sys.stderr)
continue
# Rename the persona directory too
old_path = persona.path
persona.name = name
new_path = persona.path
if old_path.exists() and old_path != new_path:
shutil.move(str(old_path), str(new_path))
persona.save()
updated_count += 1
if not is_tty:
write_record({
'id': str(persona.id) if hasattr(persona, 'id') else None,
'name': persona.name,
'path': str(persona.path),
})
except Persona.DoesNotExist:
rprint(f'[yellow]Persona not found: {persona_id or old_name}[/yellow]', file=sys.stderr)
continue
rprint(f'[green]Updated {updated_count} persona(s)[/green]', file=sys.stderr)
return 0
# =============================================================================
# DELETE
# =============================================================================
def delete_personas(yes: bool = False, dry_run: bool = False) -> int:
"""
Delete Personas from stdin JSONL.
Requires --yes flag to confirm deletion.
Exit codes:
0: Success
1: No input or missing --yes flag
"""
from archivebox.misc.jsonl import read_stdin
from archivebox.personas.models import Persona
records = list(read_stdin())
if not records:
rprint('[yellow]No records provided via stdin[/yellow]', file=sys.stderr)
return 1
# Collect persona IDs or names
persona_ids = []
persona_names = []
for r in records:
if r.get('id'):
persona_ids.append(r['id'])
elif r.get('name'):
persona_names.append(r['name'])
if not persona_ids and not persona_names:
rprint('[yellow]No valid persona IDs or names in input[/yellow]', file=sys.stderr)
return 1
from django.db.models import Q
query = Q()
if persona_ids:
query |= Q(id__in=persona_ids)
if persona_names:
query |= Q(name__in=persona_names)
personas = Persona.objects.filter(query)
count = personas.count()
if count == 0:
rprint('[yellow]No matching personas found[/yellow]', file=sys.stderr)
return 0
if dry_run:
rprint(f'[yellow]Would delete {count} persona(s) (dry run)[/yellow]', file=sys.stderr)
for persona in personas:
rprint(f' {persona.name} ({persona.path})', file=sys.stderr)
return 0
if not yes:
rprint('[red]Use --yes to confirm deletion[/red]', file=sys.stderr)
return 1
# Delete persona directories and database records
deleted_count = 0
for persona in personas:
persona_path = persona.path
# Safety check: ensure path is within PERSONAS_DIR before deletion
if not ensure_path_within_personas_dir(persona_path):
rprint(f'[red]Security error: persona path "{persona_path}" is outside PERSONAS_DIR. Skipping deletion.[/red]', file=sys.stderr)
continue
if persona_path.exists():
shutil.rmtree(persona_path)
persona.delete()
deleted_count += 1
rprint(f'[green]Deleted {deleted_count} persona(s)[/green]', file=sys.stderr)
return 0
# =============================================================================
# CLI Commands
# =============================================================================
@click.group()
def main():
"""Manage Persona records (browser profiles)."""
pass
@main.command('create')
@click.argument('names', nargs=-1)
@click.option('--import', 'import_from', help='Import profile from browser (chrome, chromium, brave, edge)')
@click.option('--profile', help='Profile directory name under the user data dir (e.g. Default, Profile 1)')
def create_cmd(names: tuple, import_from: Optional[str], profile: Optional[str]):
"""Create Personas, optionally importing from a browser profile."""
sys.exit(create_personas(names, import_from=import_from, profile=profile))
@main.command('list')
@click.option('--name', help='Filter by exact name')
@click.option('--name__icontains', help='Filter by name contains')
@click.option('--limit', '-n', type=int, help='Limit number of results')
def list_cmd(name: Optional[str], name__icontains: Optional[str], limit: Optional[int]):
"""List Personas as JSONL."""
sys.exit(list_personas(name=name, name__icontains=name__icontains, limit=limit))
@main.command('update')
@click.option('--name', '-n', help='Set new name')
def update_cmd(name: Optional[str]):
"""Update Personas from stdin JSONL."""
sys.exit(update_personas(name=name))
@main.command('delete')
@click.option('--yes', '-y', is_flag=True, help='Confirm deletion')
@click.option('--dry-run', is_flag=True, help='Show what would be deleted')
def delete_cmd(yes: bool, dry_run: bool):
"""Delete Personas from stdin JSONL."""
sys.exit(delete_personas(yes=yes, dry_run=dry_run))
if __name__ == '__main__':
main()