前言
在deepin论坛发现UOS AI最近集成了MCP,然后还有个MCP场景大发现活动,地址,就试了试,万一赢了个小鼠标呢。
所以就找llm帮忙做了一个mcp应用,调用本地ollama的。
介绍
功能很简单,输入需求,如
请帮我 xxx
然后该mcp应用会实现以下功能
- 调用本地ollama模型来根据xxx生成相应python代码
- 创建虚拟环境或使用已创建的虚拟环境
- 自动安装依赖或你的llm通过输入 pip install xxx 来帮忙安装依赖
- 执行代码,并发出日志
这样就实现了一个几乎可以帮你完成任何任务的mcp工具!
运行时会创建一个文件夹放生成的py,同时会使用一个共用的env,deepseek或其他llm在执行命令发现依赖问题时会尝试pip install xxx,把这个指令单独弄了出来识别并执行
简单演示
图中为Deepseek+UOS AI+这个mcp服务
svg图片换色
把一个蓝色的b站小图标换为粉色
文字生成
在一个文本里生成一些废话
网络功能
抓取bing美图到本地
复杂任务
将MiSans字体中的天影大侠提取出来绘制为白色图片并添加黑色边框
当然python有无限的可能
代码(2025/8/28修改)
import os
import re
import sys
import json
import logging
import subprocess
import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Set, Tuple
# 导入 MCP 核心库
from mcp.server.fastmcp import FastMCP, Context
# 导入 LangChain 和 Pydantic
from langchain_community.llms import Ollama
from pydantic import BaseModel, Field
# --- 全局日志配置 (用于服务本身) ---
# 这个 logger 用于记录服务启动、关闭等非任务相关的核心日志
global_logger = logging.getLogger("mcp_service")
if not global_logger.handlers:
global_logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
global_logger.addHandler(handler)
# --- MCP 服务器定义 ---
mcp = FastMCP(name="SkyShadowHero Task Execution Server")
# --- Pydantic 返回模型 ---
class ExecutionStage(BaseModel):
code_generation: str
dependency_installation: str
execution: str
class ExecutionResult(BaseModel):
stages: ExecutionStage
code: str
output: str
error: str
work_dir: str
returncode: int
class ServerInfo(BaseModel):
name: str
model: Optional[str]
status: str
class CommandOutput(BaseModel):
status: str
result: ExecutionResult
server: ServerInfo
# --- 配置管理 ---
class MCPConfig:
_instance = None
config_path = Path(__file__).parent / "mcp_config.json"
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.load_config()
return cls._instance
def load_config(self):
if not self.config_path.exists():
self.config = {
"mcpServers": {
"default": {
"model": "deepseek-coder-v2:16b",
"task_templates": {
"file_operations": {
"prompt": (
"你是一个顶级的Python自动化专家。你的任务是根据用户的自然语言指令,生成一段完整、健壮、可直接在标准Python环境中执行的脚本。你拥有完全的创作自由,但必须严格遵守以下规范。\n\n"
"## 用户指令:\n"
"{task}\n\n"
"## 代码生成规范 (必须严格遵守):\n"
"1. **【代码纯净性】**: 你的输出必须是纯粹的Python代码。绝对禁止包含任何Markdown标记,尤其是 ` ```python ` 和 ` ``` `。\n"
"2. **【依赖声明】**: 如果代码需要任何第三方库 (例如 `requests`, `pandas`),必须在代码的最开始,使用 `# REQUIRE: <package_name>` 的格式进行声明。**每个依赖独立一行**。如果不需要任何第三方库,则完全不要写 `# REQUIRE:` 注释。\n"
"3. **【日志记录】**: 必须使用Python的 `logging` 模块。在脚本开始处配置好 `basicConfig`,确保日志同时输出到控制台(stdout)和当前工作目录下的 `task.log` 文件。在关键步骤和任何 `except` 块中,都必须使用 `logging.info()` 或 `logging.error()` 进行记录。\n"
"4. **【错误处理】**: 所有可能失败的操作都必须被包含在 `try...except Exception as e:` 块中。\n"
"5. **【成功信号】**: 在脚本所有操作成功完成的最后,必须调用 `print(\"任务成功完成\")`。\n"
"6. **【完整性】**: 生成的代码必须是完整的、自包含的,包含所有必要的 `import` 语句。\n\n"
"现在,请根据用户指令生成代码。"
),
}
}
}
}
}
self.save_config()
else:
with open(self.config_path, 'r', encoding='utf-8') as f:
self.config = json.load(f)
def save_config(self):
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=2, ensure_ascii=False)
def get_server(self, name: str) -> Optional[Dict[str, Any]]:
return self.config.get("mcpServers", {}).get(name)
# --- 【最终修复架构】核心逻辑封装 ---
class TaskWorkflow:
def __init__(self):
self.config = MCPConfig().config
self.llm_cache = {}
self.standard_libs = self._get_standard_libs()
script_dir = Path(__file__).parent.resolve()
self.shared_work_dir = script_dir / "mcp_tasks"
self.shared_work_dir.mkdir(exist_ok=True)
global_logger.info("正在初始化并检查共享虚拟环境...")
try:
self.venv_path = self.shared_work_dir / "venv"
self.python_executable, self.pip_executable = self._create_virtual_env(str(self.venv_path))
global_logger.info(f"共享虚拟环境已就绪。Python: {self.python_executable}, Pip: {self.pip_executable}")
except Exception as e:
global_logger.error(f"初始化共享虚拟环境失败: {e}", exc_info=True)
raise RuntimeError(f"无法创建或验证共享虚拟环境,服务无法启动。错误: {e}")
def _get_standard_libs(self) -> Set[str]:
common_libs = {'os', 'sys', 'json', 're', 'logging', 'subprocess', 'pathlib', 'datetime', 'time', 'math', 'random', 'collections', 'itertools', 'functools', 'glob', 'shutil', 'tempfile', 'argparse', 'typing', '__future__'}
if sys.version_info >= (3, 10):
try:
from sys import stdlib_module_names
return set(stdlib_module_names)
except ImportError:
return common_libs
return common_libs
async def get_llm(self, model_name: str) -> Ollama:
if model_name not in self.llm_cache:
global_logger.info(f"正在加载模型: {model_name}")
self.llm_cache[model_name] = Ollama(model=model_name, temperature=0.1, top_p=0.9, timeout=300)
return self.llm_cache[model_name]
def _create_virtual_env(self, venv_path_str: str) -> Tuple[str, str]:
venv_path = Path(venv_path_str)
if sys.platform == "win32":
python_exe = venv_path / 'Scripts' / 'python.exe'
pip_exe = venv_path / 'Scripts' / 'pip.exe'
else:
python_exe = venv_path / 'bin' / 'python'
pip_exe = venv_path / 'bin' / 'pip'
if not python_exe.exists() or not pip_exe.exists():
global_logger.info(f"共享虚拟环境不完整或不存在,正在创建于: {venv_path_str}")
try:
subprocess.run(
[sys.executable, "-m", "venv", venv_path_str],
check=True, capture_output=True, text=True, timeout=120
)
except subprocess.CalledProcessError as e:
global_logger.error(f"创建虚拟环境失败: {e.stderr}")
raise RuntimeError(f"创建虚拟环境失败,错误: {e.stderr}")
if not python_exe.exists() or not pip_exe.exists():
raise FileNotFoundError(f"虚拟环境创建后,未找到 Python/Pip 可执行文件。")
global_logger.info("虚拟环境验证成功。")
return str(python_exe), str(pip_exe)
def _post_process_code(self, generated_code: str) -> Tuple[str, Set[str]]:
cleaned_code = re.sub(r"```python\n|```", "", generated_code).strip()
required_deps = set(re.findall(r"#\s*REQUIRE:\s*(\S+)", cleaned_code))
final_code = "\n".join([line for line in cleaned_code.split('\n') if not line.strip().startswith("# REQUIRE:")])
global_logger.info(f"代码后处理完成。提取的依赖: {required_deps or '无'}。")
return final_code.strip(), required_deps
def _install_dependencies(self, deps: Set[str], task_work_dir: Path):
if not deps:
global_logger.info("代码中未发现 # REQUIRE: 声明,跳过依赖安装。")
return
deps_to_install = {dep for dep in deps if dep.lower() not in self.standard_libs}
if not deps_to_install:
global_logger.info(f"所有声明的依赖 {list(deps)} 均为标准库,无需安装。")
return
# 确保文件名是 requirements.txt
requirements_path = task_work_dir / "requirements.txt"
with open(requirements_path, 'w', encoding='utf-8') as f:
for dep in deps_to_install:
f.write(f"{dep}\n")
global_logger.info(f"已生成依赖文件: {requirements_path}")
command = [self.pip_executable, "install", "-r", str(requirements_path)]
global_logger.info(f"执行依赖安装命令: {' '.join(command)}")
result = subprocess.run(
command,
cwd=str(task_work_dir),
capture_output=True,
text=True,
timeout=300,
check=False,
encoding='utf-8',
)
if result.returncode != 0:
error_message = f"依赖安装失败: {result.stderr}"
global_logger.error(error_message)
raise RuntimeError(error_message)
global_logger.info(f"依赖 {list(deps_to_install)} 安装成功。")
def _execute_code(self, code_to_execute: str, task_work_dir: str) -> Dict[str, Any]:
script_name = "generated_script.py"
code_path = os.path.join(task_work_dir, script_name)
with open(code_path, "w", encoding="utf-8") as f:
f.write(code_to_execute)
global_logger.info(f"最终执行的脚本已保存至: {code_path}")
command = [self.python_executable, script_name]
global_logger.info(f"执行代码命令: {' '.join(command)}")
result = subprocess.run(
command,
cwd=task_work_dir,
capture_output=True,
text=True,
timeout=300,
check=False,
encoding='utf-8',
)
return {"output": result.stdout, "error": result.stderr, "returncode": result.returncode}
def _is_direct_command(self, instruction: str) -> bool:
common_commands = ['python', 'pip', 'uv', 'bash', 'sh', 'ls', 'rm', 'cp', 'mv', 'mkdir']
return any(instruction.strip().startswith(cmd) for cmd in common_commands)
def _execute_direct_command(self, command: str, task_work_dir: str) -> Dict[str, Any]:
global_logger.info(f"检测到直接命令,将在虚拟环境中执行: {command}")
if sys.platform == "win32":
activate_script = Path(self.python_executable).parent / "activate.bat"
full_command = f'call "{activate_script}" && {command}'
else:
activate_script = Path(self.python_executable).parent / "activate"
full_command = f'. "{activate_script}" && {command}'
result = subprocess.run(
full_command,
cwd=task_work_dir,
capture_output=True,
text=True,
timeout=300,
check=False,
encoding='utf-8',
shell=True,
executable='/bin/bash' if sys.platform != "win32" else None
)
return {"output": result.stdout, "error": result.stderr, "returncode": result.returncode}
async def run_workflow(self, instruction: str, server_name: str, ctx: Context):
result = {
"stages": {
"code_generation": "pending",
"dependency_installation": "pending",
"execution": "pending"
},
"code": "", "output": "", "error": "", "work_dir": "", "returncode": -1
}
try:
timestamp = datetime.datetime.now().strftime("task_%Y%m%d_%H%M%S")
task_work_dir = self.shared_work_dir / timestamp
task_work_dir.mkdir(exist_ok=True)
result["work_dir"] = str(task_work_dir)
global_logger.info(f"任务 '{timestamp}' 启动,指令: '{instruction}'")
await ctx.info(f"任务工作目录已创建: {task_work_dir}")
# --- 指令嗅探和模式切换 ---
if self._is_direct_command(instruction):
await ctx.info("检测到直接命令模式。")
result["stages"]["code_generation"] = "skipped (direct command)"
result["stages"]["dependency_installation"] = "skipped (direct command)"
result["code"] = f"# Direct Command Execution\n{instruction}"
result["stages"]["execution"] = "pending"
await ctx.info(f"正在直接执行命令: {instruction}")
exec_result = self._execute_direct_command(instruction, str(task_work_dir))
result.update(exec_result)
is_successful = exec_result.get("returncode") == 0
result["stages"]["execution"] = "success" if is_successful else "failed"
final_status = "success" if is_successful else "failed"
global_logger.info(f"直接命令执行完成。状态: {final_status}。")
return {"status": final_status, "result": result, "server": {"name": server_name, "model": "N/A (Direct Command)", "status": "active"}}
# --- 如果不是直接命令,则执行原有的 LLM 工作流 ---
await ctx.info("进入 LLM 代码生成模式。")
server_config = self.config.get("mcpServers", {}).get(server_name)
if not server_config: raise ValueError(f"服务器 '{server_name}' 未配置")
template = server_config["task_templates"]["file_operations"]
prompt = template["prompt"].format(task=instruction)
await ctx.info(f"正在使用模型 '{server_config['model']}' 生成代码...")
llm = await self.get_llm(server_config['model'])
generated_code = await llm.ainvoke(prompt)
result["stages"]["code_generation"] = "success"
await ctx.info("代码生成成功。")
pure_code, dependencies = self._post_process_code(generated_code)
result["code"] = pure_code
result["stages"]["dependency_installation"] = "pending"
await ctx.info(f"正在检查并安装依赖: {dependencies or '无'}")
self._install_dependencies(dependencies, task_work_dir)
result["stages"]["dependency_installation"] = "success"
await ctx.info("所有依赖已就绪。")
result["stages"]["execution"] = "pending"
await ctx.info("正在执行生成的代码...")
exec_result = self._execute_code(pure_code, str(task_work_dir))
result.update(exec_result)
is_successful = "任务成功完成" in exec_result.get("output", "")
result["stages"]["execution"] = "success" if is_successful else "failed"
final_status = "success" if is_successful else "failed"
global_logger.info(f"代码执行完成。状态: {final_status}。")
return {"status": final_status, "result": result, "server": {"name": server_name, "model": server_config.get("model"), "status": "active"}}
except Exception as e:
current_stage = next((s for s, status in result["stages"].items() if status == "pending"), "unknown")
result["stages"][current_stage] = "failed"
error_message = f"在 '{current_stage}' 阶段失败: {e}"
result["error"] = error_message
global_logger.error(error_message, exc_info=True)
await ctx.error(error_message)
for stage, status in result["stages"].items():
if status == "pending":
result["stages"][stage] = "skipped"
return {"status": "failed", "result": result, "server": {"name": server_name, "model": self.config.get("mcpServers", {}).get(server_name, {}).get("model"), "status": "error"}}
# --- 单例和工具定义 ---
workflow_executor = TaskWorkflow()
@mcp.tool()
async def do_it_using_python(
instruction: str = Field(
description="用户用自然语言下达的、需要在本地计算机上执行的具体任务指令。例如:'在桌面上创建一个名为'todo.txt'的文件' 或 '将/home/user/docs目录下的所有.log文件压缩成一个zip包'。"
),
server: str = Field(default="default", description="要使用的服务器配置名称。"),
ctx: Context = Field(exclude=True)
) -> CommandOutput:
"""
当用户需要执行任何与文件、文件夹或系统相关的本地操作时,请使用此工具。
此工具接收一条自然语言指令,将其转换为可执行的Python代码,并在安全的虚拟环境中运行,然后返回详细的执行结果。
"""
try:
await ctx.info(f"收到指令,开始处理: '{instruction}'")
result_dict = await workflow_executor.run_workflow(
instruction=instruction,
server_name=server,
ctx=ctx
)
await ctx.info("任务流程执行完毕。")
return CommandOutput.model_validate(result_dict)
except Exception as e:
global_logger.error(f"执行工具时发生严重错误: {e}", exc_info=True)
await ctx.error(f"执行工具时发生严重错误: {e}")
return CommandOutput(
status="failed",
result=ExecutionResult(
stages=ExecutionStage(
code_generation="failed",
dependency_installation="skipped",
execution="skipped"
),
code="",
output="",
error=f"执行工具时发生顶层错误: {e}",
work_dir="",
returncode=-1
),
server=ServerInfo(name=server, model=None, status="error")
)
# --- 服务器启动---
def run():
"""
服务器主入口函数。
"""
try:
subprocess.run(["ollama", "list"], check=True, capture_output=True, text=True)
global_logger.info("Ollama 服务已在运行。")
except (subprocess.CalledProcessError, FileNotFoundError):
global_logger.warning("Ollama服务未运行,请确保Ollama已安装并正在运行。")
mcp.run()
if __name__ == "__main__":
run()
大功告成,可以使用mcp+ollama+python实现的万能工具了!
版权信息
- 文章作者: 天影大侠
- 文章链接: https://blog.skyshadow.fun/posts/28/
- 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 4.0许可协议 。转载请注明来源本站