python_execute: fix bugs in locks
parent
9109b85e1a
commit
6c746359aa
|
@ -1,18 +1,13 @@
|
|||
# 服务器组件CMakeLists.txt
|
||||
cmake_minimum_required(VERSION 3.10)
|
||||
|
||||
# 查找必要的包
|
||||
find_package(Threads REQUIRED)
|
||||
|
||||
# 添加源文件
|
||||
set(SERVER_SOURCES
|
||||
python_execute.cpp
|
||||
)
|
||||
|
||||
# 添加库
|
||||
add_library(server STATIC ${SERVER_SOURCES})
|
||||
|
||||
# 链接依赖库
|
||||
target_link_libraries(server PRIVATE mcp)
|
||||
|
||||
find_package(Python3 COMPONENTS Development)
|
||||
|
@ -26,7 +21,6 @@ else()
|
|||
message(WARNING "Python3 development libraries not found. Python interpreter will not be available.")
|
||||
endif()
|
||||
|
||||
# 包含目录
|
||||
target_include_directories(server PRIVATE
|
||||
${CMAKE_CURRENT_SOURCE_DIR}
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../
|
||||
|
@ -34,8 +28,7 @@ target_include_directories(server PRIVATE
|
|||
${CMAKE_CURRENT_SOURCE_DIR}/../mcp/common
|
||||
)
|
||||
|
||||
# 添加MCP服务器可执行文件
|
||||
add_executable(mcp_server mcp_server_main.cpp)
|
||||
add_executable(mcp_server mcp_tool_server.cpp)
|
||||
target_link_libraries(mcp_server PRIVATE server mcp Threads::Threads ${OPENSSL_LIBRARIES})
|
||||
target_include_directories(mcp_server PRIVATE
|
||||
${CMAKE_CURRENT_SOURCE_DIR}
|
||||
|
@ -45,8 +38,4 @@ target_include_directories(mcp_server PRIVATE
|
|||
)
|
||||
if(Python3_FOUND)
|
||||
target_link_libraries(mcp_server PRIVATE ${Python3_LIBRARIES})
|
||||
endif()
|
||||
|
||||
# 安装
|
||||
install(TARGETS server DESTINATION lib)
|
||||
install(TARGETS mcp_server DESTINATION bin)
|
||||
endif()
|
|
@ -19,7 +19,8 @@
|
|||
extern void register_python_execute_tool(mcp::server& server);
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
int port;
|
||||
int port = 8895;
|
||||
|
||||
if (argc == 2) {
|
||||
try {
|
||||
port = std::stoi(argv[1]);
|
||||
|
@ -27,13 +28,11 @@ int main(int argc, char* argv[]) {
|
|||
std::cerr << "Invalid port number: " << argv[1] << std::endl;
|
||||
return 1;
|
||||
}
|
||||
} else {
|
||||
port = 8818;
|
||||
}
|
||||
|
||||
// Create and configure server
|
||||
mcp::server server("localhost", port);
|
||||
server.set_server_info("HumanusMCPServer", "0.0.1");
|
||||
server.set_server_info("humanus_tool", "0.1.0");
|
||||
|
||||
// Set server capabilities
|
||||
mcp::json capabilities = {
|
|
@ -1,6 +1,5 @@
|
|||
/**
|
||||
* @file python_execute.cpp
|
||||
* @brief Python execution tool implementation
|
||||
*
|
||||
* This file implements the Python execution tool, using Python.h to directly call the Python interpreter.
|
||||
*/
|
||||
|
@ -14,7 +13,10 @@
|
|||
#include <memory>
|
||||
#include <stdexcept>
|
||||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <unordered_map>
|
||||
#include <chrono>
|
||||
#include <future>
|
||||
|
||||
// Check if Python is found
|
||||
#ifdef PYTHON_FOUND
|
||||
|
@ -27,12 +29,14 @@
|
|||
*/
|
||||
class python_interpreter {
|
||||
private:
|
||||
// Mutex to ensure thread safety of Python interpreter
|
||||
mutable std::mutex py_mutex;
|
||||
mutable std::shared_mutex py_mutex;
|
||||
bool is_initialized;
|
||||
|
||||
// Map to store Python thread states for each session
|
||||
mutable std::unordered_map<std::string, PyThreadState*> session_states;
|
||||
|
||||
// Default timeout (milliseconds)
|
||||
static constexpr unsigned int DEFAULT_TIMEOUT_MS = 30000; // 30 seconds
|
||||
|
||||
public:
|
||||
/**
|
||||
|
@ -41,6 +45,7 @@ public:
|
|||
python_interpreter() : is_initialized(false) {
|
||||
#ifdef PYTHON_FOUND
|
||||
try {
|
||||
std::unique_lock<std::shared_mutex> lock(py_mutex);
|
||||
Py_Initialize();
|
||||
if (Py_IsInitialized()) {
|
||||
is_initialized = true;
|
||||
|
@ -62,7 +67,7 @@ public:
|
|||
~python_interpreter() {
|
||||
#ifdef PYTHON_FOUND
|
||||
if (is_initialized) {
|
||||
std::lock_guard<std::mutex> lock(py_mutex);
|
||||
std::unique_lock<std::shared_mutex> lock(py_mutex);
|
||||
// Clean up all session states
|
||||
for (auto& pair : session_states) {
|
||||
PyThreadState_Swap(pair.second);
|
||||
|
@ -78,45 +83,86 @@ public:
|
|||
}
|
||||
|
||||
/**
|
||||
* @brief Get or create a thread state for a session
|
||||
* @brief Check if a session exists
|
||||
* @param session_id The session identifier
|
||||
* @return PyThreadState for the session
|
||||
* @return bool indicating if session exists
|
||||
*/
|
||||
#ifdef PYTHON_FOUND
|
||||
PyThreadState* get_session_state(const std::string& session_id) const {
|
||||
std::lock_guard<std::mutex> lock(py_mutex);
|
||||
|
||||
// Check if session already exists
|
||||
bool has_session(const std::string& session_id) const {
|
||||
std::shared_lock<std::shared_mutex> lock(py_mutex);
|
||||
return session_states.find(session_id) != session_states.end();
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Get an existing thread state for a session
|
||||
* @param session_id The session identifier
|
||||
* @return PyThreadState for the session or nullptr if not found
|
||||
*/
|
||||
PyThreadState* get_existing_session_state(const std::string& session_id) const {
|
||||
std::shared_lock<std::shared_mutex> lock(py_mutex);
|
||||
auto it = session_states.find(session_id);
|
||||
if (it != session_states.end()) {
|
||||
return it->second;
|
||||
}
|
||||
|
||||
// Create new thread state for this session
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Create a new thread state for a session
|
||||
* @param session_id The session identifier
|
||||
* @return PyThreadState for the new session
|
||||
*/
|
||||
PyThreadState* create_session_state(const std::string& session_id) const {
|
||||
std::unique_lock<std::shared_mutex> lock(py_mutex);
|
||||
if (session_states.count(session_id)) return session_states[session_id];
|
||||
|
||||
PyThreadState* new_state = Py_NewInterpreter();
|
||||
if (!new_state) {
|
||||
throw std::runtime_error("Failed to create new Python interpreter for session" + session_id);
|
||||
throw std::runtime_error("Failed to create new Python interpreter for session " + session_id);
|
||||
}
|
||||
|
||||
// Store and return the new state
|
||||
PyEval_SaveThread(); // Release GIL after creation
|
||||
session_states[session_id] = new_state;
|
||||
return new_state;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief Get or create a thread state for a session
|
||||
* @param session_id The session identifier
|
||||
* @return PyThreadState for the session
|
||||
*/
|
||||
PyThreadState* get_session_state(const std::string& session_id) const {
|
||||
// Try to get existing session state with read lock
|
||||
PyThreadState* state = get_existing_session_state(session_id);
|
||||
if (state) {
|
||||
return state;
|
||||
}
|
||||
|
||||
// If it doesn't exist, create a new session (will use write lock)
|
||||
return create_session_state(session_id);
|
||||
}
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Execute Python code in the context of a specific session
|
||||
* @brief Execute Python code in the context of a specific session with timeout
|
||||
* @param input JSON object containing Python code
|
||||
* @param session_id The session identifier
|
||||
* @return JSON object with execution results
|
||||
*/
|
||||
mcp::json forward(const mcp::json& input, const std::string& session_id) const {
|
||||
mcp::json forward(const mcp::json& input, const std::string& session_id) {
|
||||
#ifdef PYTHON_FOUND
|
||||
if (!is_initialized) {
|
||||
return mcp::json{{"error", "Python interpreter not properly initialized"}};
|
||||
}
|
||||
if (!is_initialized) {
|
||||
return mcp::json{{"error", "Python interpreter not properly initialized"}};
|
||||
}
|
||||
|
||||
unsigned int timeout_ms = DEFAULT_TIMEOUT_MS;
|
||||
if (input.contains("timeout_ms") && input["timeout_ms"].is_number()) {
|
||||
timeout_ms = input["timeout_ms"].get<unsigned int>();
|
||||
}
|
||||
|
||||
std::packaged_task<mcp::json()> task([this, &input, session_id]() {
|
||||
mcp::json thread_result;
|
||||
|
||||
// Get or create session thread state
|
||||
PyThreadState* tstate = nullptr;
|
||||
try {
|
||||
tstate = get_session_state(session_id);
|
||||
|
@ -124,175 +170,73 @@ public:
|
|||
return mcp::json{{"error", e.what()}};
|
||||
}
|
||||
|
||||
PyThreadState* old_state = PyThreadState_Swap(tstate);
|
||||
|
||||
mcp::json result_json;
|
||||
|
||||
PyEval_RestoreThread(tstate);
|
||||
|
||||
try {
|
||||
if (input.contains("code") && input["code"].is_string()) {
|
||||
std::string code = input["code"].get<std::string>();
|
||||
|
||||
// Get main module and dictionary
|
||||
PyObject *main_module = PyImport_AddModule("__main__");
|
||||
if (!main_module) {
|
||||
PyThreadState_Swap(old_state);
|
||||
return mcp::json{{"error", "Failed to get Python main module"}};
|
||||
}
|
||||
|
||||
PyObject *main_dict = PyModule_GetDict(main_module);
|
||||
if (!main_dict) {
|
||||
PyThreadState_Swap(old_state);
|
||||
return mcp::json{{"error", "Failed to get Python main module dictionary"}};
|
||||
}
|
||||
|
||||
// Import sys and io modules
|
||||
PyObject *sys_module = PyImport_ImportModule("sys");
|
||||
if (!sys_module) {
|
||||
PyErr_Print();
|
||||
PyThreadState_Swap(old_state);
|
||||
return mcp::json{{"error", "Failed to import sys module"}};
|
||||
}
|
||||
|
||||
PyObject *io_module = PyImport_ImportModule("io");
|
||||
if (!io_module) {
|
||||
Py_DECREF(sys_module);
|
||||
PyErr_Print();
|
||||
PyThreadState_Swap(old_state);
|
||||
return mcp::json{{"error", "Failed to import io module"}};
|
||||
}
|
||||
|
||||
// Get StringIO class
|
||||
PyObject *string_io = PyObject_GetAttrString(io_module, "StringIO");
|
||||
if (!string_io) {
|
||||
Py_DECREF(io_module);
|
||||
Py_DECREF(sys_module);
|
||||
PyErr_Print();
|
||||
PyThreadState_Swap(old_state);
|
||||
return mcp::json{{"error", "Failed to get StringIO class"}};
|
||||
}
|
||||
|
||||
// Create StringIO objects
|
||||
PyObject *sys_stdout = PyObject_CallObject(string_io, nullptr);
|
||||
if (!sys_stdout) {
|
||||
Py_DECREF(string_io);
|
||||
Py_DECREF(io_module);
|
||||
Py_DECREF(sys_module);
|
||||
PyErr_Print();
|
||||
PyThreadState_Swap(old_state);
|
||||
return mcp::json{{"error", "Failed to create stdout StringIO object"}};
|
||||
}
|
||||
|
||||
PyObject *sys_stderr = PyObject_CallObject(string_io, nullptr);
|
||||
if (!sys_stderr) {
|
||||
Py_DECREF(sys_stdout);
|
||||
Py_DECREF(string_io);
|
||||
Py_DECREF(io_module);
|
||||
Py_DECREF(sys_module);
|
||||
PyErr_Print();
|
||||
PyThreadState_Swap(old_state);
|
||||
return mcp::json{{"error", "Failed to create stderr StringIO object"}};
|
||||
}
|
||||
|
||||
// Save original stdout and stderr
|
||||
PyObject *old_stdout = PySys_GetObject("stdout");
|
||||
PyObject *old_stderr = PySys_GetObject("stderr");
|
||||
|
||||
if (old_stdout) Py_INCREF(old_stdout);
|
||||
if (old_stderr) Py_INCREF(old_stderr);
|
||||
PySys_SetObject("stdout", sys_stdout);
|
||||
PySys_SetObject("stderr", sys_stderr);
|
||||
|
||||
// Replace sys.stdout and sys.stderr
|
||||
if (PySys_SetObject("stdout", sys_stdout) != 0 ||
|
||||
PySys_SetObject("stderr", sys_stderr) != 0) {
|
||||
Py_DECREF(sys_stderr);
|
||||
Py_DECREF(sys_stdout);
|
||||
Py_DECREF(string_io);
|
||||
Py_DECREF(io_module);
|
||||
Py_DECREF(sys_module);
|
||||
PyErr_Print();
|
||||
PyThreadState_Swap(old_state);
|
||||
return mcp::json{{"error", "Failed to set stdout/stderr redirection"}};
|
||||
}
|
||||
|
||||
// Execute Python code
|
||||
PyObject *result = PyRun_String(code.c_str(), Py_file_input, main_dict, main_dict);
|
||||
if (!result) {
|
||||
PyErr_Print();
|
||||
}
|
||||
if (!result) PyErr_Print();
|
||||
Py_XDECREF(result);
|
||||
|
||||
// Get output and errors
|
||||
PyObject *out_value = PyObject_CallMethod(sys_stdout, "getvalue", nullptr);
|
||||
PyObject *err_value = PyObject_CallMethod(sys_stderr, "getvalue", nullptr);
|
||||
|
||||
std::string output, error;
|
||||
|
||||
// Safely convert Python strings to C++ strings
|
||||
if (out_value && PyUnicode_Check(out_value)) {
|
||||
output = PyUnicode_AsUTF8(out_value);
|
||||
}
|
||||
|
||||
if (err_value && PyUnicode_Check(err_value)) {
|
||||
error = PyUnicode_AsUTF8(err_value);
|
||||
}
|
||||
std::string output = out_value && PyUnicode_Check(out_value) ? PyUnicode_AsUTF8(out_value) : "";
|
||||
std::string error = err_value && PyUnicode_Check(err_value) ? PyUnicode_AsUTF8(err_value) : "";
|
||||
|
||||
// Restore original stdout and stderr
|
||||
if (old_stdout) {
|
||||
PySys_SetObject("stdout", old_stdout);
|
||||
Py_DECREF(old_stdout);
|
||||
}
|
||||
|
||||
if (old_stderr) {
|
||||
PySys_SetObject("stderr", old_stderr);
|
||||
Py_DECREF(old_stderr);
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
Py_XDECREF(out_value);
|
||||
Py_XDECREF(err_value);
|
||||
Py_DECREF(sys_stdout);
|
||||
Py_DECREF(sys_stderr);
|
||||
Py_DECREF(string_io);
|
||||
Py_DECREF(io_module);
|
||||
Py_DECREF(sys_module);
|
||||
|
||||
// Prepare JSON output
|
||||
if (!output.empty()) {
|
||||
result_json["output"] = output;
|
||||
}
|
||||
if (!error.empty()) {
|
||||
result_json["error"] = error;
|
||||
}
|
||||
if (!output.empty()) thread_result["output"] = output;
|
||||
if (!error.empty()) thread_result["error"] = error;
|
||||
|
||||
if (result_json.empty()) {
|
||||
std::string last_line;
|
||||
std::istringstream code_stream(code);
|
||||
while (std::getline(code_stream, last_line, '\n')) {}
|
||||
size_t pos = last_line.find_last_of(';') + 1;
|
||||
pos = last_line.find("=") + 1;
|
||||
while (pos < last_line.size() && isblank(last_line[pos])) {
|
||||
pos++;
|
||||
}
|
||||
if (pos != std::string::npos) {
|
||||
last_line = last_line.substr(pos);
|
||||
}
|
||||
|
||||
result_json["warning"] = "No output. Maybe try with print(" + last_line + ")?";
|
||||
if (thread_result.empty()) {
|
||||
thread_result["warning"] = "No output generated. Consider using print statements.";
|
||||
}
|
||||
} else {
|
||||
result_json["error"] = "Invalid parameters or code not provided";
|
||||
thread_result["error"] = "Invalid parameters or code not provided";
|
||||
}
|
||||
} catch (const std::exception& e) {
|
||||
result_json["error"] = std::string("Python execution exception: ") + e.what();
|
||||
thread_result["error"] = std::string("Python execution exception: ") + e.what();
|
||||
}
|
||||
|
||||
// Restore previous thread state
|
||||
PyThreadState_Swap(old_state);
|
||||
|
||||
return result_json;
|
||||
tstate = PyEval_SaveThread(); // Save thread state and release GIL
|
||||
return thread_result;
|
||||
});
|
||||
|
||||
auto future = task.get_future();
|
||||
std::thread execution_thread(std::move(task));
|
||||
|
||||
if (future.wait_for(std::chrono::milliseconds(timeout_ms)) == std::future_status::timeout) {
|
||||
if (execution_thread.joinable()) execution_thread.detach(); // detach to prevent zombie threads
|
||||
return mcp::json{{"error", "Python execution timed out after " + std::to_string(timeout_ms) + "ms"}};
|
||||
}
|
||||
|
||||
if (execution_thread.joinable()) execution_thread.join(); // Join thread if it's still running
|
||||
|
||||
return future.get();
|
||||
#else
|
||||
return mcp::json{{"error", "Python interpreter not available"}};
|
||||
return mcp::json{{"error", "Python interpreter not available"}};
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief Clean up a session and remove its thread state
|
||||
|
@ -300,19 +244,18 @@ public:
|
|||
*/
|
||||
void cleanup_session(const std::string& session_id) {
|
||||
#ifdef PYTHON_FOUND
|
||||
std::lock_guard<std::mutex> lock(py_mutex);
|
||||
|
||||
std::unique_lock<std::shared_mutex> lock(py_mutex);
|
||||
auto it = session_states.find(session_id);
|
||||
if (it != session_states.end()) {
|
||||
PyThreadState* old_state = PyThreadState_Swap(it->second);
|
||||
PyThreadState_Clear(it->second);
|
||||
PyThreadState_Delete(it->second);
|
||||
PyThreadState_Swap(old_state);
|
||||
|
||||
PyThreadState* state = it->second;
|
||||
PyEval_RestoreThread(state);
|
||||
Py_EndInterpreter(state); // Properly end the child interpreter
|
||||
session_states.erase(it);
|
||||
PyEval_SaveThread(); // Restore the main thread's GIL release state
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
// Global Python interpreter instance
|
||||
|
@ -325,7 +268,7 @@ mcp::json python_execute_handler(const mcp::json& args, const std::string& sessi
|
|||
}
|
||||
|
||||
try {
|
||||
// Use Python interpreter to execute code with session context
|
||||
// Use Python interpreter to execute code with session context and timeout
|
||||
mcp::json result = interpreter.forward(args, session_id);
|
||||
|
||||
return {{
|
||||
|
@ -341,8 +284,9 @@ mcp::json python_execute_handler(const mcp::json& args, const std::string& sessi
|
|||
// Register the PythonExecute tool
|
||||
void register_python_execute_tool(mcp::server& server) {
|
||||
mcp::tool python_tool = mcp::tool_builder("python_execute")
|
||||
.with_description("Execute Python code and return the result")
|
||||
.with_string_param("code", "The Python code to execute", true)
|
||||
.with_description("Executes Python code string. Note: Only print outputs are visible, function return values are not captured. Use print statements to see results.")
|
||||
.with_string_param("code", "The Python code to execute. Note: Use absolute file paths if code will read/write files.", true)
|
||||
.with_number_param("timeout_ms", "Timeout in milliseconds for code execution (default: 30000)", false)
|
||||
.build();
|
||||
|
||||
server.register_tool(python_tool, python_execute_handler);
|
||||
|
|
Loading…
Reference in New Issue