humanus.cpp/server/python_execute.cpp

179 lines
5.8 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

/**
* @file python_execute.cpp
* @brief OpenManus Python执行工具实现
*
* 这个文件实现了OpenManus的Python执行工具使用Python.h直接调用Python解释器。
*/
#include "mcp/include/mcp_server.h"
#include "mcp/include/mcp_tool.h"
#include "mcp/include/mcp_resource.h"
#include <iostream>
#include <string>
#include <memory>
#include <stdexcept>
// 检查是否找到Python
#ifdef PYTHON_FOUND
#include <Python.h>
#endif
/**
* @class python_interpreter
* @brief Python解释器类用于执行Python代码
*/
class python_interpreter {
public:
/**
* @brief 构造函数初始化Python解释器
*/
python_interpreter() {
#ifdef PYTHON_FOUND
Py_Initialize();
#endif
}
/**
* @brief 析构函数释放Python解释器
*/
~python_interpreter() {
#ifdef PYTHON_FOUND
Py_Finalize();
#endif
}
/**
* @brief 执行Python代码
* @param input 包含Python代码的JSON对象
* @return 执行结果的JSON对象
*/
mcp::json forward(const mcp::json& input) const {
#ifdef PYTHON_FOUND
if (input.contains("code") && input["code"].is_string()) {
std::string code = input["code"].get<std::string>();
// Create scope to manage Python objects automatically
PyObject *main_module = PyImport_AddModule("__main__");
PyObject *main_dict = PyModule_GetDict(main_module);
PyObject *sys_module = PyImport_ImportModule("sys");
PyObject *io_module = PyImport_ImportModule("io");
PyObject *string_io = PyObject_GetAttrString(io_module, "StringIO");
PyObject *sys_stdout = PyObject_CallObject(string_io, nullptr);
PyObject *sys_stderr = PyObject_CallObject(string_io, nullptr);
// Replace sys.stdout and sys.stderr with our StringIO objects
PySys_SetObject("stdout", sys_stdout);
PySys_SetObject("stderr", sys_stderr);
// Execute the Python code
PyObject *result = PyRun_String(code.c_str(), Py_file_input, main_dict, main_dict);
if (!result) {
PyErr_Print();
}
Py_XDECREF(result);
// Fetch the output and error from the StringIO object
PyObject *out_value = PyObject_CallMethod(sys_stdout, "getvalue", nullptr);
PyObject *err_value = PyObject_CallMethod(sys_stderr, "getvalue", nullptr);
// Convert Python string to C++ string
std::string output = PyUnicode_AsUTF8(out_value);
std::string error = PyUnicode_AsUTF8(err_value);
// Restore the original sys.stdout and sys.stderr
PySys_SetObject("stdout", PySys_GetObject("stdout"));
PySys_SetObject("stderr", PySys_GetObject("stderr"));
// Clean up
Py_DECREF(sys_stdout);
Py_DECREF(sys_stderr);
Py_DECREF(string_io);
Py_DECREF(io_module);
Py_DECREF(sys_module);
// Prepare JSON output
mcp::json result_json;
if (!output.empty()) {
result_json["output"] = output;
}
if (!error.empty()) {
result_json["error"] = error;
}
if (result_json.empty()) {
std::string last_line;
std::istringstream code_stream(code);
while (std::getline(code_stream, last_line, '\n')) {}
size_t pos = last_line.find_last_of(';') + 1;
pos = last_line.find("=") + 1;
while (pos < last_line.size() && isblank(last_line[pos])) {
pos++;
}
if (pos != std::string::npos) {
last_line = last_line.substr(pos);
}
return mcp::json{{"warning", "No output. Maybe try with print(" + last_line + ")?"}};
}
return result_json;
} else {
return mcp::json{{"error", "Invalid parameters or code not provided"}};
}
#else
return mcp::json{{"error", "Python interpreter not available"}};
#endif
}
};
// 全局Python解释器实例
static python_interpreter interpreter;
// Python执行工具处理函数
mcp::json python_execute_handler(const mcp::json& args) {
if (!args.contains("code")) {
std::cout << args.dump() << std::endl;
throw mcp::mcp_exception(mcp::error_code::invalid_params, "缺少'code'参数");
}
try {
// 使用Python解释器执行代码
mcp::json result = interpreter.forward(args);
// 格式化输出结果
std::string output;
if (result.contains("output")) {
output += result["output"].get<std::string>();
}
if (result.contains("error")) {
output += result["error"].get<std::string>();
}
if (result.contains("warning")) {
output += result["warning"].get<std::string>();
}
if (output.empty()) {
output = "Executed successfully but w/o output. Maybe you should print more information.";
}
return {{
{"type", "text"},
{"text", output}
}};
} catch (const std::exception& e) {
throw mcp::mcp_exception(mcp::error_code::internal_error,
"执行Python代码失败: " + std::string(e.what()));
}
}
// 注册Python执行工具的函数
void register_python_execute_tool(mcp::server& server) {
// 注册PythonExecute工具
mcp::tool python_tool = mcp::tool_builder("PythonExecute")
.with_description("执行Python代码并返回结果")
.with_string_param("code", "要执行的Python代码", true)
.build();
server.register_tool(python_tool, python_execute_handler);
}