mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-01-02 17:05:38 +10:00
394 lines
13 KiB
Python
394 lines
13 KiB
Python
__package__ = 'archivebox.mcp'
|
|
|
|
"""
|
|
Model Context Protocol (MCP) server implementation for ArchiveBox.
|
|
|
|
Dynamically exposes all ArchiveBox CLI commands as MCP tools by introspecting
|
|
Click command metadata. Handles JSON-RPC 2.0 requests over stdio transport.
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import traceback
|
|
from typing import Any, Dict, List, Optional
|
|
from io import StringIO
|
|
from contextlib import redirect_stdout, redirect_stderr
|
|
|
|
import click
|
|
from click.testing import CliRunner
|
|
|
|
from archivebox.config.version import VERSION
|
|
|
|
|
|
class MCPJSONEncoder(json.JSONEncoder):
|
|
"""Custom JSON encoder that handles Click sentinel values and other special types"""
|
|
|
|
def default(self, obj):
|
|
# Handle Click's sentinel values
|
|
if hasattr(click, 'core') and hasattr(click.core, '_SentinelClass'):
|
|
if isinstance(obj, click.core._SentinelClass):
|
|
return None
|
|
|
|
# Handle tuples (convert to lists)
|
|
if isinstance(obj, tuple):
|
|
return list(obj)
|
|
|
|
# Handle any other non-serializable objects
|
|
try:
|
|
return super().default(obj)
|
|
except TypeError:
|
|
return str(obj)
|
|
|
|
|
|
# Type mapping from Click types to JSON Schema types
|
|
def click_type_to_json_schema_type(click_type) -> dict:
|
|
"""Convert a Click parameter type to JSON Schema type definition"""
|
|
|
|
if isinstance(click_type, click.types.StringParamType):
|
|
return {"type": "string"}
|
|
elif isinstance(click_type, click.types.IntParamType):
|
|
return {"type": "integer"}
|
|
elif isinstance(click_type, click.types.FloatParamType):
|
|
return {"type": "number"}
|
|
elif isinstance(click_type, click.types.BoolParamType):
|
|
return {"type": "boolean"}
|
|
elif isinstance(click_type, click.types.Choice):
|
|
return {"type": "string", "enum": click_type.choices}
|
|
elif isinstance(click_type, click.types.Path):
|
|
return {"type": "string", "description": "File or directory path"}
|
|
elif isinstance(click_type, click.types.File):
|
|
return {"type": "string", "description": "File path"}
|
|
elif isinstance(click_type, click.types.Tuple):
|
|
# Multiple arguments of same type
|
|
return {"type": "array", "items": {"type": "string"}}
|
|
else:
|
|
# Default to string for unknown types
|
|
return {"type": "string"}
|
|
|
|
|
|
def click_command_to_mcp_tool(cmd_name: str, click_command: click.Command) -> dict:
|
|
"""
|
|
Convert a Click command to an MCP tool definition with JSON Schema.
|
|
|
|
Introspects the Click command's parameters to automatically generate
|
|
the input schema without manual definition.
|
|
"""
|
|
|
|
properties = {}
|
|
required = []
|
|
|
|
# Extract parameters from Click command
|
|
for param in click_command.params:
|
|
# Skip internal parameters
|
|
if param.name in ('help', 'version'):
|
|
continue
|
|
|
|
param_schema = click_type_to_json_schema_type(param.type)
|
|
|
|
# Add description from Click help text
|
|
if param.help:
|
|
param_schema["description"] = param.help
|
|
|
|
# Handle default values
|
|
if param.default is not None and param.default != ():
|
|
param_schema["default"] = param.default
|
|
|
|
# Handle multiple values (like multiple URLs)
|
|
if param.multiple:
|
|
properties[param.name] = {
|
|
"type": "array",
|
|
"items": param_schema,
|
|
"description": param_schema.get("description", f"Multiple {param.name} values")
|
|
}
|
|
else:
|
|
properties[param.name] = param_schema
|
|
|
|
# Mark as required if Click requires it
|
|
if param.required:
|
|
required.append(param.name)
|
|
|
|
return {
|
|
"name": cmd_name,
|
|
"description": click_command.help or click_command.short_help or f"Run archivebox {cmd_name} command",
|
|
"inputSchema": {
|
|
"type": "object",
|
|
"properties": properties,
|
|
"required": required
|
|
}
|
|
}
|
|
|
|
|
|
def execute_click_command(cmd_name: str, click_command: click.Command, arguments: dict) -> dict:
|
|
"""
|
|
Execute a Click command programmatically with given arguments.
|
|
|
|
Returns MCP-formatted result with captured output and error status.
|
|
"""
|
|
|
|
# Setup Django for archive commands (commands that need database access)
|
|
from archivebox.cli import ArchiveBoxGroup
|
|
if cmd_name in ArchiveBoxGroup.archive_commands:
|
|
try:
|
|
from archivebox.config.django import setup_django
|
|
from archivebox.misc.checks import check_data_folder
|
|
setup_django()
|
|
check_data_folder()
|
|
except Exception as e:
|
|
# If Django setup fails, return error (unless it's manage/shell which handle this themselves)
|
|
if cmd_name not in ('manage', 'shell'):
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Error setting up Django: {str(e)}\n\nMake sure you're running the MCP server from inside an ArchiveBox data directory."
|
|
}],
|
|
"isError": True
|
|
}
|
|
|
|
# Use Click's test runner to invoke command programmatically
|
|
runner = CliRunner()
|
|
|
|
# Build a map of parameter names to their Click types (Argument vs Option)
|
|
param_map = {param.name: param for param in click_command.params}
|
|
|
|
# Convert arguments dict to CLI args list
|
|
args = []
|
|
positional_args = []
|
|
|
|
for key, value in arguments.items():
|
|
param_name = key.replace('_', '-') # Click uses dashes
|
|
param = param_map.get(key)
|
|
|
|
# Check if this is a positional Argument (not an Option)
|
|
is_argument = isinstance(param, click.Argument)
|
|
|
|
if is_argument:
|
|
# Positional arguments - add them without dashes
|
|
if isinstance(value, list):
|
|
positional_args.extend([str(v) for v in value])
|
|
elif value is not None:
|
|
positional_args.append(str(value))
|
|
else:
|
|
# Options - add with dashes
|
|
if isinstance(value, bool):
|
|
if value:
|
|
args.append(f'--{param_name}')
|
|
elif isinstance(value, list):
|
|
# Multiple values for an option (rare)
|
|
for item in value:
|
|
args.append(f'--{param_name}')
|
|
args.append(str(item))
|
|
elif value is not None:
|
|
args.append(f'--{param_name}')
|
|
args.append(str(value))
|
|
|
|
# Add positional arguments at the end
|
|
args.extend(positional_args)
|
|
|
|
# Execute the command
|
|
try:
|
|
result = runner.invoke(click_command, args, catch_exceptions=False)
|
|
|
|
# Format output as MCP content
|
|
content = []
|
|
|
|
if result.output:
|
|
content.append({
|
|
"type": "text",
|
|
"text": result.output
|
|
})
|
|
|
|
if result.stderr_bytes:
|
|
stderr_text = result.stderr_bytes.decode('utf-8', errors='replace')
|
|
if stderr_text.strip():
|
|
content.append({
|
|
"type": "text",
|
|
"text": f"[stderr]\n{stderr_text}"
|
|
})
|
|
|
|
# Check exit code
|
|
is_error = result.exit_code != 0
|
|
|
|
if is_error and not content:
|
|
content.append({
|
|
"type": "text",
|
|
"text": f"Command failed with exit code {result.exit_code}"
|
|
})
|
|
|
|
return {
|
|
"content": content or [{"type": "text", "text": "(no output)"}],
|
|
"isError": is_error
|
|
}
|
|
|
|
except Exception as e:
|
|
# Capture any exceptions during execution
|
|
error_trace = traceback.format_exc()
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": f"Error executing {cmd_name}: {str(e)}\n\n{error_trace}"
|
|
}],
|
|
"isError": True
|
|
}
|
|
|
|
|
|
class MCPServer:
|
|
"""
|
|
Model Context Protocol server for ArchiveBox.
|
|
|
|
Provides JSON-RPC 2.0 interface over stdio, dynamically exposing
|
|
all Click commands as MCP tools.
|
|
"""
|
|
|
|
def __init__(self):
|
|
# Import here to avoid circular imports
|
|
from archivebox.cli import ArchiveBoxGroup
|
|
|
|
self.cli_group = ArchiveBoxGroup()
|
|
self.protocol_version = "2025-11-25"
|
|
self._tool_cache = {} # Cache loaded Click commands
|
|
|
|
def get_click_command(self, cmd_name: str) -> Optional[click.Command]:
|
|
"""Get a Click command by name, with caching"""
|
|
if cmd_name not in self._tool_cache:
|
|
if cmd_name not in self.cli_group.all_subcommands:
|
|
return None
|
|
self._tool_cache[cmd_name] = self.cli_group.get_command(None, cmd_name)
|
|
return self._tool_cache[cmd_name]
|
|
|
|
def handle_initialize(self, params: dict) -> dict:
|
|
"""Handle MCP initialize request"""
|
|
return {
|
|
"protocolVersion": self.protocol_version,
|
|
"capabilities": {
|
|
"tools": {}
|
|
},
|
|
"serverInfo": {
|
|
"name": "archivebox-mcp",
|
|
"version": VERSION
|
|
}
|
|
}
|
|
|
|
def handle_tools_list(self, params: dict) -> dict:
|
|
"""Handle MCP tools/list request - returns all available CLI commands as tools"""
|
|
tools = []
|
|
|
|
for cmd_name in self.cli_group.all_subcommands.keys():
|
|
click_cmd = self.get_click_command(cmd_name)
|
|
if click_cmd:
|
|
try:
|
|
tool_def = click_command_to_mcp_tool(cmd_name, click_cmd)
|
|
tools.append(tool_def)
|
|
except Exception as e:
|
|
# Log but don't fail - skip problematic commands
|
|
print(f"Warning: Could not generate tool for {cmd_name}: {e}", file=sys.stderr)
|
|
|
|
return {"tools": tools}
|
|
|
|
def handle_tools_call(self, params: dict) -> dict:
|
|
"""Handle MCP tools/call request - executes a CLI command"""
|
|
tool_name = params.get('name')
|
|
arguments = params.get('arguments', {})
|
|
|
|
if not tool_name:
|
|
raise ValueError("Missing required parameter: name")
|
|
|
|
click_cmd = self.get_click_command(tool_name)
|
|
if not click_cmd:
|
|
raise ValueError(f"Unknown tool: {tool_name}")
|
|
|
|
# Execute the command and return MCP-formatted result
|
|
return execute_click_command(tool_name, click_cmd, arguments)
|
|
|
|
def handle_request(self, request: dict) -> dict:
|
|
"""
|
|
Handle a JSON-RPC 2.0 request and return response.
|
|
|
|
Supports MCP methods: initialize, tools/list, tools/call
|
|
"""
|
|
|
|
method = request.get('method')
|
|
params = request.get('params', {})
|
|
request_id = request.get('id')
|
|
|
|
try:
|
|
# Route to appropriate handler
|
|
if method == 'initialize':
|
|
result = self.handle_initialize(params)
|
|
elif method == 'tools/list':
|
|
result = self.handle_tools_list(params)
|
|
elif method == 'tools/call':
|
|
result = self.handle_tools_call(params)
|
|
else:
|
|
# Method not found
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"error": {
|
|
"code": -32601,
|
|
"message": f"Method not found: {method}"
|
|
}
|
|
}
|
|
|
|
# Success response
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": result
|
|
}
|
|
|
|
except Exception as e:
|
|
# Error response
|
|
error_trace = traceback.format_exc()
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"error": {
|
|
"code": -32603,
|
|
"message": str(e),
|
|
"data": error_trace
|
|
}
|
|
}
|
|
|
|
def run_stdio_server(self):
|
|
"""
|
|
Run the MCP server in stdio mode.
|
|
|
|
Reads JSON-RPC requests from stdin (one per line),
|
|
writes JSON-RPC responses to stdout (one per line).
|
|
"""
|
|
|
|
# Read requests from stdin line by line
|
|
for line in sys.stdin:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
try:
|
|
# Parse JSON-RPC request
|
|
request = json.loads(line)
|
|
|
|
# Handle request
|
|
response = self.handle_request(request)
|
|
|
|
# Write response to stdout (use custom encoder for Click types)
|
|
print(json.dumps(response, cls=MCPJSONEncoder), flush=True)
|
|
|
|
except json.JSONDecodeError as e:
|
|
# Invalid JSON
|
|
error_response = {
|
|
"jsonrpc": "2.0",
|
|
"id": None,
|
|
"error": {
|
|
"code": -32700,
|
|
"message": "Parse error",
|
|
"data": str(e)
|
|
}
|
|
}
|
|
print(json.dumps(error_response, cls=MCPJSONEncoder), flush=True)
|
|
|
|
|
|
def run_mcp_server():
|
|
"""Main entry point for MCP server"""
|
|
server = MCPServer()
|
|
server.run_stdio_server()
|