/** * @file python_execute.cpp * @brief Python execution tool implementation * * This file implements the Python execution tool, using Python.h to directly call the Python interpreter. */ #include "mcp_server.h" #include "mcp_tool.h" #include "mcp_resource.h" #include #include #include #include #include // Check if Python is found #ifdef PYTHON_FOUND #include #endif /** * @class python_interpreter * @brief Python interpreter class for executing Python code */ class python_interpreter { private: // Mutex to ensure thread safety of Python interpreter mutable std::mutex py_mutex; bool is_initialized; public: /** * @brief Constructor, initializes Python interpreter */ python_interpreter() : is_initialized(false) { #ifdef PYTHON_FOUND try { Py_Initialize(); if (Py_IsInitialized()) { is_initialized = true; PyThreadState *_save = PyEval_SaveThread(); } else { std::cerr << "Failed to initialize Python interpreter" << std::endl; } } catch (const std::exception& e) { std::cerr << "Python interpreter initialization exception: " << e.what() << std::endl; } #endif } /** * @brief Destructor, releases Python interpreter */ ~python_interpreter() { #ifdef PYTHON_FOUND if (is_initialized) { std::lock_guard lock(py_mutex); Py_Finalize(); is_initialized = false; } #endif } /** * @brief Execute Python code * @param input JSON object containing Python code * @return JSON object with execution results */ mcp::json forward(const mcp::json& input) const { #ifdef PYTHON_FOUND if (!is_initialized) { return mcp::json{{"error", "Python interpreter not properly initialized"}}; } // Acquire GIL lock std::lock_guard lock(py_mutex); PyGILState_STATE gstate = PyGILState_Ensure(); mcp::json result_json; try { if (input.contains("code") && input["code"].is_string()) { std::string code = input["code"].get(); // Get main module and dictionary PyObject *main_module = PyImport_AddModule("__main__"); if (!main_module) { PyGILState_Release(gstate); return mcp::json{{"error", "Failed to get Python main module"}}; } PyObject *main_dict = PyModule_GetDict(main_module); if (!main_dict) { PyGILState_Release(gstate); return mcp::json{{"error", "Failed to get Python main module dictionary"}}; } // Import sys and io modules PyObject *sys_module = PyImport_ImportModule("sys"); if (!sys_module) { PyErr_Print(); PyGILState_Release(gstate); return mcp::json{{"error", "Failed to import sys module"}}; } PyObject *io_module = PyImport_ImportModule("io"); if (!io_module) { Py_DECREF(sys_module); PyErr_Print(); PyGILState_Release(gstate); return mcp::json{{"error", "Failed to import io module"}}; } // Get StringIO class PyObject *string_io = PyObject_GetAttrString(io_module, "StringIO"); if (!string_io) { Py_DECREF(io_module); Py_DECREF(sys_module); PyErr_Print(); PyGILState_Release(gstate); return mcp::json{{"error", "Failed to get StringIO class"}}; } // Create StringIO objects PyObject *sys_stdout = PyObject_CallObject(string_io, nullptr); if (!sys_stdout) { Py_DECREF(string_io); Py_DECREF(io_module); Py_DECREF(sys_module); PyErr_Print(); PyGILState_Release(gstate); return mcp::json{{"error", "Failed to create stdout StringIO object"}}; } PyObject *sys_stderr = PyObject_CallObject(string_io, nullptr); if (!sys_stderr) { Py_DECREF(sys_stdout); Py_DECREF(string_io); Py_DECREF(io_module); Py_DECREF(sys_module); PyErr_Print(); PyGILState_Release(gstate); return mcp::json{{"error", "Failed to create stderr StringIO object"}}; } // Save original stdout and stderr PyObject *old_stdout = PySys_GetObject("stdout"); PyObject *old_stderr = PySys_GetObject("stderr"); if (old_stdout) Py_INCREF(old_stdout); if (old_stderr) Py_INCREF(old_stderr); // Replace sys.stdout and sys.stderr if (PySys_SetObject("stdout", sys_stdout) != 0 || PySys_SetObject("stderr", sys_stderr) != 0) { Py_DECREF(sys_stderr); Py_DECREF(sys_stdout); Py_DECREF(string_io); Py_DECREF(io_module); Py_DECREF(sys_module); PyErr_Print(); PyGILState_Release(gstate); return mcp::json{{"error", "Failed to set stdout/stderr redirection"}}; } // Execute Python code PyObject *result = PyRun_String(code.c_str(), Py_file_input, main_dict, main_dict); if (!result) { PyErr_Print(); } Py_XDECREF(result); // Get output and errors PyObject *out_value = PyObject_CallMethod(sys_stdout, "getvalue", nullptr); PyObject *err_value = PyObject_CallMethod(sys_stderr, "getvalue", nullptr); std::string output, error; // Safely convert Python strings to C++ strings if (out_value && PyUnicode_Check(out_value)) { output = PyUnicode_AsUTF8(out_value); } if (err_value && PyUnicode_Check(err_value)) { error = PyUnicode_AsUTF8(err_value); } // Restore original stdout and stderr if (old_stdout) { PySys_SetObject("stdout", old_stdout); Py_DECREF(old_stdout); } if (old_stderr) { PySys_SetObject("stderr", old_stderr); Py_DECREF(old_stderr); } // Cleanup Py_XDECREF(out_value); Py_XDECREF(err_value); Py_DECREF(sys_stdout); Py_DECREF(sys_stderr); Py_DECREF(string_io); Py_DECREF(io_module); Py_DECREF(sys_module); // Prepare JSON output if (!output.empty()) { result_json["output"] = output; } if (!error.empty()) { result_json["error"] = error; } if (result_json.empty()) { std::string last_line; std::istringstream code_stream(code); while (std::getline(code_stream, last_line, '\n')) {} size_t pos = last_line.find_last_of(';') + 1; pos = last_line.find("=") + 1; while (pos < last_line.size() && isblank(last_line[pos])) { pos++; } if (pos != std::string::npos) { last_line = last_line.substr(pos); } result_json["warning"] = "No output. Maybe try with print(" + last_line + ")?"; } } else { result_json["error"] = "Invalid parameters or code not provided"; } } catch (const std::exception& e) { result_json["error"] = std::string("Python execution exception: ") + e.what(); } // Release GIL PyGILState_Release(gstate); return result_json; #else return mcp::json{{"error", "Python interpreter not available"}}; #endif } }; // Global Python interpreter instance static python_interpreter interpreter; // Python execution tool handler function mcp::json python_execute_handler(const mcp::json& args) { if (!args.contains("code")) { throw mcp::mcp_exception(mcp::error_code::invalid_params, "Missing 'code' parameter"); } try { // Use Python interpreter to execute code mcp::json result = interpreter.forward(args); return {{ {"type", "text"}, {"text", result.dump(2)} }}; } catch (const std::exception& e) { throw mcp::mcp_exception(mcp::error_code::internal_error, "Failed to execute Python code: " + std::string(e.what())); } } // Register the PythonExecute tool void register_python_execute_tool(mcp::server& server) { mcp::tool python_tool = mcp::tool_builder("python_execute") .with_description("Execute Python code and return the result") .with_string_param("code", "The Python code to execute", true) .build(); server.register_tool(python_tool, python_execute_handler); }