humanus.cpp/server/python_execute.cpp

280 lines
9.7 KiB
C++

/**
* @file python_execute.cpp
* @brief Python execution tool implementation
*
* This file implements the Python execution tool, using Python.h to directly call the Python interpreter.
*/
#include "mcp_server.h"
#include "mcp_tool.h"
#include "mcp_resource.h"
#include <iostream>
#include <string>
#include <memory>
#include <stdexcept>
#include <mutex>
// Check if Python is found
#ifdef PYTHON_FOUND
#include <Python.h>
#endif
/**
* @class python_interpreter
* @brief Python interpreter class for executing Python code
*/
class python_interpreter {
private:
// Mutex to ensure thread safety of Python interpreter
mutable std::mutex py_mutex;
bool is_initialized;
public:
/**
* @brief Constructor, initializes Python interpreter
*/
python_interpreter() : is_initialized(false) {
#ifdef PYTHON_FOUND
try {
Py_Initialize();
if (Py_IsInitialized()) {
is_initialized = true;
PyThreadState *_save = PyEval_SaveThread();
} else {
std::cerr << "Failed to initialize Python interpreter" << std::endl;
}
} catch (const std::exception& e) {
std::cerr << "Python interpreter initialization exception: " << e.what() << std::endl;
}
#endif
}
/**
* @brief Destructor, releases Python interpreter
*/
~python_interpreter() {
#ifdef PYTHON_FOUND
if (is_initialized) {
std::lock_guard<std::mutex> lock(py_mutex);
Py_Finalize();
is_initialized = false;
}
#endif
}
/**
* @brief Execute Python code
* @param input JSON object containing Python code
* @return JSON object with execution results
*/
mcp::json forward(const mcp::json& input) const {
#ifdef PYTHON_FOUND
if (!is_initialized) {
return mcp::json{{"error", "Python interpreter not properly initialized"}};
}
// Acquire GIL lock
std::lock_guard<std::mutex> lock(py_mutex);
PyGILState_STATE gstate = PyGILState_Ensure();
mcp::json result_json;
try {
if (input.contains("code") && input["code"].is_string()) {
std::string code = input["code"].get<std::string>();
// Get main module and dictionary
PyObject *main_module = PyImport_AddModule("__main__");
if (!main_module) {
PyGILState_Release(gstate);
return mcp::json{{"error", "Failed to get Python main module"}};
}
PyObject *main_dict = PyModule_GetDict(main_module);
if (!main_dict) {
PyGILState_Release(gstate);
return mcp::json{{"error", "Failed to get Python main module dictionary"}};
}
// Import sys and io modules
PyObject *sys_module = PyImport_ImportModule("sys");
if (!sys_module) {
PyErr_Print();
PyGILState_Release(gstate);
return mcp::json{{"error", "Failed to import sys module"}};
}
PyObject *io_module = PyImport_ImportModule("io");
if (!io_module) {
Py_DECREF(sys_module);
PyErr_Print();
PyGILState_Release(gstate);
return mcp::json{{"error", "Failed to import io module"}};
}
// Get StringIO class
PyObject *string_io = PyObject_GetAttrString(io_module, "StringIO");
if (!string_io) {
Py_DECREF(io_module);
Py_DECREF(sys_module);
PyErr_Print();
PyGILState_Release(gstate);
return mcp::json{{"error", "Failed to get StringIO class"}};
}
// Create StringIO objects
PyObject *sys_stdout = PyObject_CallObject(string_io, nullptr);
if (!sys_stdout) {
Py_DECREF(string_io);
Py_DECREF(io_module);
Py_DECREF(sys_module);
PyErr_Print();
PyGILState_Release(gstate);
return mcp::json{{"error", "Failed to create stdout StringIO object"}};
}
PyObject *sys_stderr = PyObject_CallObject(string_io, nullptr);
if (!sys_stderr) {
Py_DECREF(sys_stdout);
Py_DECREF(string_io);
Py_DECREF(io_module);
Py_DECREF(sys_module);
PyErr_Print();
PyGILState_Release(gstate);
return mcp::json{{"error", "Failed to create stderr StringIO object"}};
}
// Save original stdout and stderr
PyObject *old_stdout = PySys_GetObject("stdout");
PyObject *old_stderr = PySys_GetObject("stderr");
if (old_stdout) Py_INCREF(old_stdout);
if (old_stderr) Py_INCREF(old_stderr);
// Replace sys.stdout and sys.stderr
if (PySys_SetObject("stdout", sys_stdout) != 0 ||
PySys_SetObject("stderr", sys_stderr) != 0) {
Py_DECREF(sys_stderr);
Py_DECREF(sys_stdout);
Py_DECREF(string_io);
Py_DECREF(io_module);
Py_DECREF(sys_module);
PyErr_Print();
PyGILState_Release(gstate);
return mcp::json{{"error", "Failed to set stdout/stderr redirection"}};
}
// Execute Python code
PyObject *result = PyRun_String(code.c_str(), Py_file_input, main_dict, main_dict);
if (!result) {
PyErr_Print();
}
Py_XDECREF(result);
// Get output and errors
PyObject *out_value = PyObject_CallMethod(sys_stdout, "getvalue", nullptr);
PyObject *err_value = PyObject_CallMethod(sys_stderr, "getvalue", nullptr);
std::string output, error;
// Safely convert Python strings to C++ strings
if (out_value && PyUnicode_Check(out_value)) {
output = PyUnicode_AsUTF8(out_value);
}
if (err_value && PyUnicode_Check(err_value)) {
error = PyUnicode_AsUTF8(err_value);
}
// Restore original stdout and stderr
if (old_stdout) {
PySys_SetObject("stdout", old_stdout);
Py_DECREF(old_stdout);
}
if (old_stderr) {
PySys_SetObject("stderr", old_stderr);
Py_DECREF(old_stderr);
}
// Cleanup
Py_XDECREF(out_value);
Py_XDECREF(err_value);
Py_DECREF(sys_stdout);
Py_DECREF(sys_stderr);
Py_DECREF(string_io);
Py_DECREF(io_module);
Py_DECREF(sys_module);
// Prepare JSON output
if (!output.empty()) {
result_json["output"] = output;
}
if (!error.empty()) {
result_json["error"] = error;
}
if (result_json.empty()) {
std::string last_line;
std::istringstream code_stream(code);
while (std::getline(code_stream, last_line, '\n')) {}
size_t pos = last_line.find_last_of(';') + 1;
pos = last_line.find("=") + 1;
while (pos < last_line.size() && isblank(last_line[pos])) {
pos++;
}
if (pos != std::string::npos) {
last_line = last_line.substr(pos);
}
result_json["warning"] = "No output. Maybe try with print(" + last_line + ")?";
}
} else {
result_json["error"] = "Invalid parameters or code not provided";
}
} catch (const std::exception& e) {
result_json["error"] = std::string("Python execution exception: ") + e.what();
}
// Release GIL
PyGILState_Release(gstate);
return result_json;
#else
return mcp::json{{"error", "Python interpreter not available"}};
#endif
}
};
// Global Python interpreter instance
static python_interpreter interpreter;
// Python execution tool handler function
mcp::json python_execute_handler(const mcp::json& args) {
if (!args.contains("code")) {
throw mcp::mcp_exception(mcp::error_code::invalid_params, "Missing 'code' parameter");
}
try {
// Use Python interpreter to execute code
mcp::json result = interpreter.forward(args);
return {{
{"type", "text"},
{"text", result.dump(2)}
}};
} catch (const std::exception& e) {
throw mcp::mcp_exception(mcp::error_code::internal_error,
"Failed to execute Python code: " + std::string(e.what()));
}
}
// Register the PythonExecute tool
void register_python_execute_tool(mcp::server& server) {
mcp::tool python_tool = mcp::tool_builder("python_execute")
.with_description("Execute Python code and return the result")
.with_string_param("code", "The Python code to execute", true)
.build();
server.register_tool(python_tool, python_execute_handler);
}