自己實現一個 ClickHouse 的 MCP
背景
最近看到一個 greptimedb 的 mcp 文章,效果也是挺酷的。這裏也試着一個 clickhouse 的實現,大概花了幾個小時,這裏也開源在 https://github.com/dubin555/clickhouse_mcp_server ,github 上也有 2 3 個實現,我寫的大概是代碼和註釋最全的實現
效果
寫數據
先向 clickhouse 寫點假數據,這裏弄一些假的銷售數據 (什麼數據都可以, 可以讓豆包 元寶幫你造些假數據都可以)
-- Create sales analysis table with commentsCREATE TABLE IF NOT EXISTS default.city_sales
(
city String COMMENT 'Name of the city where the sale occurred',
product_category Enum('Electronics' = 1, 'Apparel' = 2, 'Grocery' = 3) COMMENT 'Category of the product sold',
sale_date Date COMMENT 'Date of the sales transaction',
units_sold UInt32 COMMENT 'Number of units sold in the transaction',
unit_price Float32 COMMENT 'Price per unit in USD',
total_sales Float32 MATERIALIZED units_sold * unit_price COMMENT 'Calculated total sales amount'
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(sale_date)
ORDER BY (city, product_category, sale_date)
COMMENT 'Table storing city-wise product sales data for business analysis';
-- Generate 10,000 random sales recordsINSERT INTO default.city_sales (city, product_category, sale_date, units_sold, unit_price)
SELECT
['New York', 'London', 'Tokyo', 'Paris', 'Singapore', 'Dubai'][rand() % 6 + 1] AS city,
toInt16(rand() % 3 + 1) AS product_category,
today() - rand() % 365 AS sale_date,
rand() % 100 + 1 AS units_sold, -- Units between 1-100
randNormal(50, 15) AS unit_price -- Normal distribution around $50FROM numbers(10000);
這裏主要是:城市,銷售品類,產品, 銷售額 等一些列
提問
我們開始提問 (這裏用的客戶端是 vscode 的 cline 插件)
🏆
各城市的銷售額是多少?哪個商品最暢銷?
llm 的第一次調用
實現
在 github 大概參考了 2 3 個實現,實現起來也不復雜, 完整的可以去 github 看代碼,這裏講解一個最重要的 server.py
整體
裏面有 5 個主要的類組成
-
ClickHouseClient: 負責創建 clickhouse 的連接,發起查詢
-
TableMetadataManager:負責查詢表的元數據,例如有哪些列,註釋等信息
-
ResourceManager:負責構造給 LLM 的提示,有哪些 Resource 可以訪問,會調用 TableMetadataManager
-
ToolManager:負責提示 LLM 有哪些 tool 可以使用,以及調用這些 tool,會調用 ClickHouseClient
-
DatabaseServer:整合上面 4 個類的信息
具體實現
ClickHouseClient
class ClickHouseClient:
"""ClickHouse database client"""
def __init__(self, config: Config, logger: Logger):
self.logger = logger
self.db_config = {
"host": config.host,
"port": int(config.port),
"user": config.user,
"password": config.password,
"database": config.database
}
self._client = None
def get_client(self):
"""Get ClickHouse client, singleton pattern"""
if self._client is None:
self._client = self._create_client()
return self._client
def _create_client(self):
"""Create a new ClickHouse client"""
try:
self.logger.debug(f"Creating ClickHouse client with config: {self.db_config}")
client = clickhouse_connect.get_client(**self.db_config)
version = client.server_version
self.logger.info("ClickHouse client created successfully")
return client
except Exception as e:
self.logger.error(f"Failed to create ClickHouse client: {e}")
raise
def execute_query(self, query: str, readonly: bool = True):
"""Execute a query against the ClickHouse database"""
try:
client = self.get_client()
settings = {"readonly": 1} if readonly else {}
res = client.query(query, settings=settings)
# convert result to list of dicts
rows = []
for row in res.result_rows:
row_dict = {}
for i, col_name in enumerate(res.column_names):
row_dict[col_name] = row[i]
rows.append(row_dict)
self.logger.debug(f"Query executed successfully: {query}")
return rows
except Exception as e:
self.logger.error(f"Failed to execute query: {e}")
raise
TableMetadataManager
class TableMetadataManager:
"""Manage table metadata in ClickHouse"""
def __init__(self, client: ClickHouseClient, logger: Logger):
self.client = client
self.logger = logger
def get_table_list(self, database: str) -> List[str]:
"""Get list of tables in the database"""
query = f"SHOW TABLES FROM {quote_identifier(database)}"
result = self.client.execute_query(query)
if not result:
return []
return [row[next(iter(row.keys()))] for row in result]
def get_table_comments(self, database: str) -> Dict[str, str]:
"""Get comments for the tables in the database"""
query = f"SELECT name, comment FROM system.tables WHERE database = {format_query_value(database)}"
result = self.client.execute_query(query)
return {row['name']: row['comment'] for row in result}
def get_column_comments(self, database: str) -> Dict[str, Dict[str, str]]:
"""Get comments for the columns in the tables in the database"""
query = f"SELECT table, name, comment FROM system.columns WHERE database = {format_query_value(database)}"
result = self.client.execute_query(query)
column_comments = {}
for row in result:
table, col_name, comment = row['table'], row['name'], row['comment']
if table not in column_comments:
column_comments[table] = {}
column_comments[table][col_name] = comment
return column_comments
def format_table_description(self, table_name: str, table_comment: str, columns_info: Dict[str, str]) -> str:
"""Format table description for the model"""
description = f"Table: {table_name}\n"
if table_comment:
description += f"Description: {table_comment}\n"
else:
description += "Description: No description provided\n"
if columns_info:
# Add column descriptions
description += "Columns:\n"
for col_name, col_comment in columns_info.items():
if col_comment:
description += f" - {col_name}: {col_comment}\n"
else:
description += f" - {col_name}: No description provided\n"
return description
ResourceManager
class ResourceManager:
"""MCP resource manager"""
def __init__(self, client: ClickHouseClient, logger: Logger
, resource_prefix: str = DEFAULT_RESOURCE_PREFIX
, results_limit: int = DEFAULT_RESULTS_LIMIT):
self.client = client
self.logger = logger
self.metadata_manager = TableMetadataManager(client, logger)
self.resource_prefix = resource_prefix
self.results_limit = results_limit
async def list_resources(self) -> List[Resource]:
"""List all resources in the database"""
self.logger.debug("Listing resources")
database = self.client.db_config.get("database")
try:
# Get table list
table_list = self.metadata_manager.get_table_list(database)
if not table_list:
return []
# Get table comments and column comments
table_comments = self.metadata_manager.get_table_comments(database)
column_comments = self.metadata_manager.get_column_comments(database)
# Format table descriptions
resources = []
for table_name in table_list:
table_comment = table_comments.get(table_name, "")
columns_info = column_comments.get(table_name, {})
description = self.metadata_manager.format_table_description(table_name, table_comment, columns_info)
# Create resources
resource = Resource(
uri=f"{self.resource_prefix}/{table_name}/data",
Table: {table_name}",
mimeType="text/plain",
description=description,
type="table",
metadata = {
"columns": [
{
"name": col_name,
"description": col_comment
}
for col_name, col_comment in columns_info.items()
]
}
)
resources.append(resource)
self.logger.debug(f"Found {len(resources)} resources")
return resources
except Exception as e:
self.logger.error(f"Failed to list resources: {e}")
return []
async def read_resource(self, uri: AnyUrl) -> str:
"""Read resource data"""
self.logger.debug(f"Reading resource: {uri}")
uri_str = str(uri)
try:
# Parse URI
if not uri_str.startswith(self.resource_prefix):
self.logger.error(f"Invalid resource URI: {uri}")
return ""
# get talbe name
table_name = uri_str[len(self.resource_prefix):].split("/")[0]
# get query
query = f"SELECT * FROM {quote_identifier(table_name)} LIMIT {self.results_limit}"
result = self.client.execute_query(query)
# format result
if not result:
return "No data found"
return json.dumps(result, default=str , indent=2)
except Exception as e:
self.logger.error(f"Failed to read resource: {e}")
return f"Error reading resource: {str(e)}"
ToolManager
class ToolManager:
"""MCP tool manager"""
def __init__(self, client: ClickHouseClient, logger: Logger):
self.client = client
self.logger = logger
async def list_tools(self) -> List[Tool]:
"""List all tools"""
self.logger.debug("Listing tools")
return [
Tool(
,
description="Execute a query against the ClickHouse database",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SQL query to be executed"
}
},
"required": ["query"],
}
)
]
async def call_tool(self, name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Call a tool"""
self.logger.debug(f"Calling tool: {name} with arguments: {arguments}")
# Tool handler mapping
tool_handlers = {
"execute_sql": self._handle_execute_sql
}
# Get handler
handler = tool_handlers.get(name)
if not handler:
self.logger.error(f"Tool not found: {name}")
return []
# Call handler
return await handler(arguments)
async def _handle_execute_sql(self, arguments: Dict[str, str]) -> List[TextContent]:
"""Handle execute_sql tool"""
self.logger.debug("Handling execute_sql tool")
# Get query
query = arguments.get("query")
if not query:
self.logger.error("Query is required")
return []
# Check query
is_dangerous, pattern = dangerous_check(query)
if is_dangerous:
self.logger.error(f"Dangerous query detected: {pattern}")
return [TextContent(value=f"Error: Dangerous query detected: {pattern}")]
try:
# Execute query
result = self.client.execute_query(query)
json_result = json.dumps(result, default=str, indent=2)
return [
TextContent(
type='text',
text=json_result,
mimeType='application/json'
)
]
except Exception as e:
self.logger.error(f"Failed to execute query: {e}")
return [TextContent(type='text', text=f"Error executing query: {str(e)}")]
DatabaseServer
class DatabaseServer:
"""MCP database server"""
def __init__(self, config: Config, logger: Logger):
self.app = Server("clickhouse_mcp_server")
self.logger = logger
# create components
self.client = ClickHouseClient(config, logger)
self.resource_manager = ResourceManager(self.client, logger)
self.tool_manager = ToolManager(self.client, logger)
# register components
self.app.list_resources()(self.resource_manager.list_resources)
self.app.read_resource()(self.resource_manager.read_resource)
self.app.list_tools()(self.tool_manager.list_tools)
self.app.call_tool()(self.tool_manager.call_tool)
async def run(self):
"""Run the server"""
from mcp.server.stdio import stdio_server
self.logger.info("Starting server")
async with stdio_server() as (read_stream, write_stream):
try:
await self.app.run(
read_stream,
write_stream,
self.app.create_initialization_options()
)
except Exception as e:
self.logger.error(f"Server error: {e}")
raise
結尾
喜歡的話,可以點個 star,使用起來有什麼問題,也可以提 issue 或者留言
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/womAhSogmdjlqkSeZIvqIA