/** * @file python_execute.cpp * @brief OpenManus Python执行工具实现 * * 这个文件实现了OpenManus的Python执行工具,使用Python.h直接调用Python解释器。 */ #include "mcp/include/mcp_server.h" #include "mcp/include/mcp_tool.h" #include "mcp/include/mcp_resource.h" #include #include #include #include // 检查是否找到Python #ifdef PYTHON_FOUND #include #endif /** * @class python_interpreter * @brief Python解释器类,用于执行Python代码 */ class python_interpreter { public: /** * @brief 构造函数,初始化Python解释器 */ python_interpreter() { #ifdef PYTHON_FOUND Py_Initialize(); #endif } /** * @brief 析构函数,释放Python解释器 */ ~python_interpreter() { #ifdef PYTHON_FOUND Py_Finalize(); #endif } /** * @brief 执行Python代码 * @param input 包含Python代码的JSON对象 * @return 执行结果的JSON对象 */ mcp::json forward(const mcp::json& input) const { #ifdef PYTHON_FOUND if (input.contains("code") && input["code"].is_string()) { std::string code = input["code"].get(); // Create scope to manage Python objects automatically PyObject *main_module = PyImport_AddModule("__main__"); PyObject *main_dict = PyModule_GetDict(main_module); PyObject *sys_module = PyImport_ImportModule("sys"); PyObject *io_module = PyImport_ImportModule("io"); PyObject *string_io = PyObject_GetAttrString(io_module, "StringIO"); PyObject *sys_stdout = PyObject_CallObject(string_io, nullptr); PyObject *sys_stderr = PyObject_CallObject(string_io, nullptr); // Replace sys.stdout and sys.stderr with our StringIO objects PySys_SetObject("stdout", sys_stdout); PySys_SetObject("stderr", sys_stderr); // Execute the Python code PyObject *result = PyRun_String(code.c_str(), Py_file_input, main_dict, main_dict); if (!result) { PyErr_Print(); } Py_XDECREF(result); // Fetch the output and error from the StringIO object PyObject *out_value = PyObject_CallMethod(sys_stdout, "getvalue", nullptr); PyObject *err_value = PyObject_CallMethod(sys_stderr, "getvalue", nullptr); // Convert Python string to C++ string std::string output = PyUnicode_AsUTF8(out_value); std::string error = PyUnicode_AsUTF8(err_value); // Restore the original sys.stdout and sys.stderr PySys_SetObject("stdout", PySys_GetObject("stdout")); PySys_SetObject("stderr", PySys_GetObject("stderr")); // Clean up Py_DECREF(sys_stdout); Py_DECREF(sys_stderr); Py_DECREF(string_io); Py_DECREF(io_module); Py_DECREF(sys_module); // Prepare JSON output mcp::json result_json; if (!output.empty()) { result_json["output"] = output; } if (!error.empty()) { result_json["error"] = error; } if (result_json.empty()) { std::string last_line; std::istringstream code_stream(code); while (std::getline(code_stream, last_line, '\n')) {} size_t pos = last_line.find_last_of(';') + 1; pos = last_line.find("=") + 1; while (pos < last_line.size() && isblank(last_line[pos])) { pos++; } if (pos != std::string::npos) { last_line = last_line.substr(pos); } return mcp::json{{"warning", "No output. Maybe try with print(" + last_line + ")?"}}; } return result_json; } else { return mcp::json{{"error", "Invalid parameters or code not provided"}}; } #else return mcp::json{{"error", "Python interpreter not available"}}; #endif } }; // 全局Python解释器实例 static python_interpreter interpreter; // Python执行工具处理函数 mcp::json python_execute_handler(const mcp::json& args) { if (!args.contains("code")) { std::cout << args.dump() << std::endl; throw mcp::mcp_exception(mcp::error_code::invalid_params, "缺少'code'参数"); } try { // 使用Python解释器执行代码 mcp::json result = interpreter.forward(args); // 格式化输出结果 std::string output; if (result.contains("output")) { output += result["output"].get(); } if (result.contains("error")) { output += result["error"].get(); } if (result.contains("warning")) { output += result["warning"].get(); } if (output.empty()) { output = "Executed successfully but w/o output. Maybe you should print more information."; } return {{ {"type", "text"}, {"text", output} }}; } catch (const std::exception& e) { throw mcp::mcp_exception(mcp::error_code::internal_error, "执行Python代码失败: " + std::string(e.what())); } } // 注册Python执行工具的函数 void register_python_execute_tool(mcp::server& server) { // 注册PythonExecute工具 mcp::tool python_tool = mcp::tool_builder("PythonExecute") .with_description("执行Python代码并返回结果") .with_string_param("code", "要执行的Python代码", true) .build(); server.register_tool(python_tool, python_execute_handler); }