Files
2025-12-29 13:21:46 -08:00

101 lines
2.9 KiB
Python

"""
Ripgrep search backend - searches files directly without indexing.
This backend doesn't maintain an index - it searches archived files directly
using ripgrep (rg). This is simpler but slower for large archives.
Environment variables:
RIPGREP_BINARY: Path to ripgrep binary (default: rg)
RIPGREP_ARGS: Default ripgrep arguments (JSON array)
RIPGREP_ARGS_EXTRA: Extra arguments to append (JSON array)
RIPGREP_TIMEOUT: Search timeout in seconds (default: 90)
"""
import json
import os
import subprocess
import shutil
from pathlib import Path
from typing import List, Iterable
from django.conf import settings
def get_env(name: str, default: str = '') -> str:
return os.environ.get(name, default).strip()
def get_env_int(name: str, default: int = 0) -> int:
try:
return int(get_env(name, str(default)))
except ValueError:
return default
def get_env_array(name: str, default: list[str] | None = None) -> list[str]:
"""Parse a JSON array from environment variable."""
val = get_env(name, '')
if not val:
return default if default is not None else []
try:
result = json.loads(val)
if isinstance(result, list):
return [str(item) for item in result]
return default if default is not None else []
except json.JSONDecodeError:
return default if default is not None else []
def search(query: str) -> List[str]:
"""Search for snapshots using ripgrep."""
rg_binary = get_env('RIPGREP_BINARY', 'rg')
rg_binary = shutil.which(rg_binary) or rg_binary
if not rg_binary or not Path(rg_binary).exists():
raise RuntimeError(f'ripgrep binary not found. Install with: apt install ripgrep')
timeout = get_env_int('RIPGREP_TIMEOUT', 90)
ripgrep_args = get_env_array('RIPGREP_ARGS', [])
ripgrep_args_extra = get_env_array('RIPGREP_ARGS_EXTRA', [])
archive_dir = Path(settings.ARCHIVE_DIR)
if not archive_dir.exists():
return []
cmd = [
rg_binary,
*ripgrep_args,
*ripgrep_args_extra,
'--regexp',
query,
str(archive_dir),
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
# Extract snapshot IDs from file paths
# Paths look like: archive/<snapshot_id>/<extractor>/file.txt
snapshot_ids = set()
for line in result.stdout.strip().split('\n'):
if not line:
continue
path = Path(line)
try:
relative = path.relative_to(archive_dir)
snapshot_id = relative.parts[0]
snapshot_ids.add(snapshot_id)
except (ValueError, IndexError):
continue
return list(snapshot_ids)
except subprocess.TimeoutExpired:
return []
except Exception:
return []
def flush(snapshot_ids: Iterable[str]) -> None:
"""No-op for ripgrep - it searches files directly."""
pass