|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
本帖最后由 快速收敛 于 2025-5-7 16:29 编辑
在帖子MCP官方示例+Cherry Studio尝试中,我们创建了MCP官方示例中的天气接口服务,并用软件Cherry Studio用作客户端接入了MCP服务,完成了简单的MCP使用。
接下来我们看下官方通过代码实现MCP客户端来连接MCP服务端。官方代码使用的是Anthropic,由于不能访问国外,使用不了Claude的api,我都换成了通义千问的大模型的api,经过测试也能完成测试。
1. 创建项目
- # 创建项目目录
- uv init mcp-client
- cd mcp-client
- # 创建虚拟环境
- uv venv
- # 激活虚拟环境
- # 在 Windows 上:
- .venv\Scripts\activate
- # 在 Unix 或 MacOS 上:
- source .venv/bin/activate
- # 安装所需的包
- uv add mcp anthropic python-dotenv
- # 删除样板文件
- rm main.py
- # 创建我们的主文件
- touch client.py
复制代码
2. 首先需要到阿里百炼大模型平台申请API秘钥,将秘钥保存至项目目录中的.env文件中
3. 阿里的api可以使用openai的sdk,完全兼容,所以只需要安装openai库即可。将官方示例的代码修改为:
- import asyncio
- from typing import Optional
- from contextlib import AsyncExitStack
- from mcp import ClientSession, StdioServerParameters
- from mcp.client.stdio import stdio_client
- from openai import OpenAI
- from dotenv import load_dotenv
- import os
- import json
- load_dotenv() # load environment variables from .env
- class MCPClient:
- def __init__(self):
- # Initialize session and client objects
- self.session: Optional[ClientSession] = None
- self.exit_stack = AsyncExitStack()
- self.openai = OpenAI(
- api_key=os.getenv("DASHSCOPE_API_KEY"), # 如果您没有配置环境变量,请用阿里云百炼API Key将本行替换为:api_key="sk-xxx"
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", # 填写DashScope base_url
- )
复制代码
4. 创建链接到MCP服务的方法,这一部分代码不用修改:
- async def connect_to_server(self, server_script_path: str):
- """Connect to an MCP server
- Args:
- server_script_path: Path to the server script (.py or .js)
- """
- is_python = server_script_path.endswith('.py')
- is_js = server_script_path.endswith('.js')
- if not (is_python or is_js):
- raise ValueError("Server script must be a .py or .js file")
- command = "python" if is_python else "node"
- server_params = StdioServerParameters(
- command=command,
- args=[server_script_path],
- env=None
- )
- stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
- self.stdio, self.write = stdio_transport
- self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
- await self.session.initialize()
- # List available tools
- response = await self.session.list_tools()
- tools = response.tools
- print("\nConnected to server with tools:", [tool.name for tool in tools])
复制代码
5. 调用大模型,查询天气接口,这一部分修改为openai风格,使用千问大模型:
- async def process_query(self, query: str) -> str:
- """Process a query using OpenAI and available tools"""
- messages = [{
- "role": "user",
- "content": query
- }]
- # 获取可用工具列表
- response = await self.session.list_tools()
- available_tools = [{
- "type": "function",
- "function": {
- "name": tool.name,
- "description": tool.description,
- "parameters": tool.inputSchema
- }
- } for tool in response.tools]
- final_text = []
- while True:
- # OpenAI API调用
- response = self.openai.chat.completions.create(
- model="qwen-plus",
- messages=messages,
- tools=available_tools,
- tool_choice="auto",
- max_tokens=1000
- )
- message = response.choices[0].message
- final_text.append(message.content)
- # 如果没有工具调用则结束
- if not message.tool_calls:
- break
- # 处理所有工具调用
- tool_responses = []
- for tool_call in message.tool_calls:
- tool_name = tool_call.function.name
- tool_args = json.loads(tool_call.function.arguments)
- # 执行工具调用
- result = await self.session.call_tool(tool_name, tool_args)
- final_text.append(f"[Called {tool_name} with args {tool_args}]")
- # 记录工具响应
- tool_responses.append({
- "tool_call_id": tool_call.id,
- "role": "tool",
- "name": tool_name,
- "content": result.content
- })
- # 将工具响应添加到消息历史
- messages.append({
- "role": "assistant",
- "content": message.content,
- "tool_calls": [
- {
- "id": tc.id,
- "type": "function",
- "function": {
- "name": tc.function.name,
- "arguments": tc.function.arguments
- }
- } for tc in message.tool_calls
- ]
- })
- messages.extend(tool_responses)
- return "\n".join([t for t in final_text if t is not None])
复制代码
6. 聊天交互式循环,这一部分不用修改:
- async def chat_loop(self):
- """Run an interactive chat loop"""
- print("\nMCP Client Started!")
- print("Type your queries or 'quit' to exit.")
- while True:
- query = input("\nQuery: ").strip()
- if query.lower() == 'quit':
- break
- response = await self.process_query(query)
- print("\n" + response)
- async def cleanup(self):
- """Clean up resources"""
- await self.exit_stack.aclose()
复制代码
7. 运行函数,指定之前的MCP服务的python路径:
- async def main():
- if len(sys.argv) < 2:
- print("Usage: python client.py <path_to_server_script>")
- sys.exit(1)
- client = MCPClient()
- try:
- await client.connect_to_server(sys.argv[1])
- await client.chat_loop()
- finally:
- await client.cleanup()
- if __name__ == "__main__":
- import sys
- asyncio.run(main())
复制代码
代码完成,开启调试:
运行MCP客户端,并指定服务端
程序返回了可调用的MCP工具方法,并让用户输入问题。还是使用之前的问题:请问纽约今天的天气怎么样?
可以看见程序又列出了可以使用的工具,千问大模型获取到纽约的经纬度,并通过经纬度调用天气信息,再通过千问大模型总结输入.
有点意思 ,但能干啥?
|
评分
-
参与人数 2 | 荣誉 +5 |
鱼币 +6 |
贡献 +6 |
C币 +1 |
收起
理由
|
不二如是
| + 2 |
+ 3 |
+ 3 |
+ 1 |
MCP未来热度趋势 |
慢慢即漫漫
| + 3 |
+ 3 |
+ 3 |
|
无条件支持楼主! |
查看全部评分
|