Files
ArchiveBox/archivebox/mcp/server.py
2025-12-25 03:59:51 -08:00

394 lines
13 KiB
Python

__package__ = 'archivebox.mcp'
"""
Model Context Protocol (MCP) server implementation for ArchiveBox.
Dynamically exposes all ArchiveBox CLI commands as MCP tools by introspecting
Click command metadata. Handles JSON-RPC 2.0 requests over stdio transport.
"""
import sys
import json
import traceback
from typing import Any, Dict, List, Optional
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr
import click
from click.testing import CliRunner
from archivebox.config.version import VERSION
class MCPJSONEncoder(json.JSONEncoder):
"""Custom JSON encoder that handles Click sentinel values and other special types"""
def default(self, obj):
# Handle Click's sentinel values
if hasattr(click, 'core') and hasattr(click.core, '_SentinelClass'):
if isinstance(obj, click.core._SentinelClass):
return None
# Handle tuples (convert to lists)
if isinstance(obj, tuple):
return list(obj)
# Handle any other non-serializable objects
try:
return super().default(obj)
except TypeError:
return str(obj)
# Type mapping from Click types to JSON Schema types
def click_type_to_json_schema_type(click_type) -> dict:
"""Convert a Click parameter type to JSON Schema type definition"""
if isinstance(click_type, click.types.StringParamType):
return {"type": "string"}
elif isinstance(click_type, click.types.IntParamType):
return {"type": "integer"}
elif isinstance(click_type, click.types.FloatParamType):
return {"type": "number"}
elif isinstance(click_type, click.types.BoolParamType):
return {"type": "boolean"}
elif isinstance(click_type, click.types.Choice):
return {"type": "string", "enum": click_type.choices}
elif isinstance(click_type, click.types.Path):
return {"type": "string", "description": "File or directory path"}
elif isinstance(click_type, click.types.File):
return {"type": "string", "description": "File path"}
elif isinstance(click_type, click.types.Tuple):
# Multiple arguments of same type
return {"type": "array", "items": {"type": "string"}}
else:
# Default to string for unknown types
return {"type": "string"}
def click_command_to_mcp_tool(cmd_name: str, click_command: click.Command) -> dict:
"""
Convert a Click command to an MCP tool definition with JSON Schema.
Introspects the Click command's parameters to automatically generate
the input schema without manual definition.
"""
properties = {}
required = []
# Extract parameters from Click command
for param in click_command.params:
# Skip internal parameters
if param.name in ('help', 'version'):
continue
param_schema = click_type_to_json_schema_type(param.type)
# Add description from Click help text
if param.help:
param_schema["description"] = param.help
# Handle default values
if param.default is not None and param.default != ():
param_schema["default"] = param.default
# Handle multiple values (like multiple URLs)
if param.multiple:
properties[param.name] = {
"type": "array",
"items": param_schema,
"description": param_schema.get("description", f"Multiple {param.name} values")
}
else:
properties[param.name] = param_schema
# Mark as required if Click requires it
if param.required:
required.append(param.name)
return {
"name": cmd_name,
"description": click_command.help or click_command.short_help or f"Run archivebox {cmd_name} command",
"inputSchema": {
"type": "object",
"properties": properties,
"required": required
}
}
def execute_click_command(cmd_name: str, click_command: click.Command, arguments: dict) -> dict:
"""
Execute a Click command programmatically with given arguments.
Returns MCP-formatted result with captured output and error status.
"""
# Setup Django for archive commands (commands that need database access)
from archivebox.cli import ArchiveBoxGroup
if cmd_name in ArchiveBoxGroup.archive_commands:
try:
from archivebox.config.django import setup_django
from archivebox.misc.checks import check_data_folder
setup_django()
check_data_folder()
except Exception as e:
# If Django setup fails, return error (unless it's manage/shell which handle this themselves)
if cmd_name not in ('manage', 'shell'):
return {
"content": [{
"type": "text",
"text": f"Error setting up Django: {str(e)}\n\nMake sure you're running the MCP server from inside an ArchiveBox data directory."
}],
"isError": True
}
# Use Click's test runner to invoke command programmatically
runner = CliRunner()
# Build a map of parameter names to their Click types (Argument vs Option)
param_map = {param.name: param for param in click_command.params}
# Convert arguments dict to CLI args list
args = []
positional_args = []
for key, value in arguments.items():
param_name = key.replace('_', '-') # Click uses dashes
param = param_map.get(key)
# Check if this is a positional Argument (not an Option)
is_argument = isinstance(param, click.Argument)
if is_argument:
# Positional arguments - add them without dashes
if isinstance(value, list):
positional_args.extend([str(v) for v in value])
elif value is not None:
positional_args.append(str(value))
else:
# Options - add with dashes
if isinstance(value, bool):
if value:
args.append(f'--{param_name}')
elif isinstance(value, list):
# Multiple values for an option (rare)
for item in value:
args.append(f'--{param_name}')
args.append(str(item))
elif value is not None:
args.append(f'--{param_name}')
args.append(str(value))
# Add positional arguments at the end
args.extend(positional_args)
# Execute the command
try:
result = runner.invoke(click_command, args, catch_exceptions=False)
# Format output as MCP content
content = []
if result.output:
content.append({
"type": "text",
"text": result.output
})
if result.stderr_bytes:
stderr_text = result.stderr_bytes.decode('utf-8', errors='replace')
if stderr_text.strip():
content.append({
"type": "text",
"text": f"[stderr]\n{stderr_text}"
})
# Check exit code
is_error = result.exit_code != 0
if is_error and not content:
content.append({
"type": "text",
"text": f"Command failed with exit code {result.exit_code}"
})
return {
"content": content or [{"type": "text", "text": "(no output)"}],
"isError": is_error
}
except Exception as e:
# Capture any exceptions during execution
error_trace = traceback.format_exc()
return {
"content": [{
"type": "text",
"text": f"Error executing {cmd_name}: {str(e)}\n\n{error_trace}"
}],
"isError": True
}
class MCPServer:
"""
Model Context Protocol server for ArchiveBox.
Provides JSON-RPC 2.0 interface over stdio, dynamically exposing
all Click commands as MCP tools.
"""
def __init__(self):
# Import here to avoid circular imports
from archivebox.cli import ArchiveBoxGroup
self.cli_group = ArchiveBoxGroup()
self.protocol_version = "2025-11-25"
self._tool_cache = {} # Cache loaded Click commands
def get_click_command(self, cmd_name: str) -> Optional[click.Command]:
"""Get a Click command by name, with caching"""
if cmd_name not in self._tool_cache:
if cmd_name not in self.cli_group.all_subcommands:
return None
self._tool_cache[cmd_name] = self.cli_group.get_command(None, cmd_name)
return self._tool_cache[cmd_name]
def handle_initialize(self, params: dict) -> dict:
"""Handle MCP initialize request"""
return {
"protocolVersion": self.protocol_version,
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "archivebox-mcp",
"version": VERSION
}
}
def handle_tools_list(self, params: dict) -> dict:
"""Handle MCP tools/list request - returns all available CLI commands as tools"""
tools = []
for cmd_name in self.cli_group.all_subcommands.keys():
click_cmd = self.get_click_command(cmd_name)
if click_cmd:
try:
tool_def = click_command_to_mcp_tool(cmd_name, click_cmd)
tools.append(tool_def)
except Exception as e:
# Log but don't fail - skip problematic commands
print(f"Warning: Could not generate tool for {cmd_name}: {e}", file=sys.stderr)
return {"tools": tools}
def handle_tools_call(self, params: dict) -> dict:
"""Handle MCP tools/call request - executes a CLI command"""
tool_name = params.get('name')
arguments = params.get('arguments', {})
if not tool_name:
raise ValueError("Missing required parameter: name")
click_cmd = self.get_click_command(tool_name)
if not click_cmd:
raise ValueError(f"Unknown tool: {tool_name}")
# Execute the command and return MCP-formatted result
return execute_click_command(tool_name, click_cmd, arguments)
def handle_request(self, request: dict) -> dict:
"""
Handle a JSON-RPC 2.0 request and return response.
Supports MCP methods: initialize, tools/list, tools/call
"""
method = request.get('method')
params = request.get('params', {})
request_id = request.get('id')
try:
# Route to appropriate handler
if method == 'initialize':
result = self.handle_initialize(params)
elif method == 'tools/list':
result = self.handle_tools_list(params)
elif method == 'tools/call':
result = self.handle_tools_call(params)
else:
# Method not found
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
# Success response
return {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}
except Exception as e:
# Error response
error_trace = traceback.format_exc()
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": str(e),
"data": error_trace
}
}
def run_stdio_server(self):
"""
Run the MCP server in stdio mode.
Reads JSON-RPC requests from stdin (one per line),
writes JSON-RPC responses to stdout (one per line).
"""
# Read requests from stdin line by line
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
# Parse JSON-RPC request
request = json.loads(line)
# Handle request
response = self.handle_request(request)
# Write response to stdout (use custom encoder for Click types)
print(json.dumps(response, cls=MCPJSONEncoder), flush=True)
except json.JSONDecodeError as e:
# Invalid JSON
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": "Parse error",
"data": str(e)
}
}
print(json.dumps(error_response, cls=MCPJSONEncoder), flush=True)
def run_mcp_server():
"""Main entry point for MCP server"""
server = MCPServer()
server.run_stdio_server()