Files
2025-12-24 20:10:38 -08:00

51 lines
1.7 KiB
Python

"""
Sonic search backend - search and flush operations.
This module provides the search interface for the Sonic backend.
"""
import os
from typing import List, Iterable
def get_sonic_config() -> dict:
"""Get Sonic connection configuration."""
return {
'host': os.environ.get('SEARCH_BACKEND_HOST_NAME', '127.0.0.1').strip(),
'port': int(os.environ.get('SEARCH_BACKEND_PORT', '1491')),
'password': os.environ.get('SEARCH_BACKEND_PASSWORD', 'SecretPassword').strip(),
'collection': os.environ.get('SONIC_COLLECTION', 'archivebox').strip(),
'bucket': os.environ.get('SONIC_BUCKET', 'snapshots').strip(),
}
def search(query: str) -> List[str]:
"""Search for snapshots in Sonic."""
try:
from sonic import SearchClient
except ImportError:
raise RuntimeError('sonic-client not installed. Run: pip install sonic-client')
config = get_sonic_config()
with SearchClient(config['host'], config['port'], config['password']) as search_client:
results = search_client.query(config['collection'], config['bucket'], query, limit=100)
return results
def flush(snapshot_ids: Iterable[str]) -> None:
"""Remove snapshots from Sonic index."""
try:
from sonic import IngestClient
except ImportError:
raise RuntimeError('sonic-client not installed. Run: pip install sonic-client')
config = get_sonic_config()
with IngestClient(config['host'], config['port'], config['password']) as ingest:
for snapshot_id in snapshot_ids:
try:
ingest.flush_object(config['collection'], config['bucket'], snapshot_id)
except Exception:
pass