stdio_client: fix win implementation

main
hkr04 2025-03-30 16:06:05 +08:00
parent 3fe1e049fb
commit ac8e520bdd
1 changed files with 78 additions and 35 deletions

View File

@ -9,7 +9,7 @@
#include "mcp_stdio_client.h" #include "mcp_stdio_client.h"
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32)
#include <windows.h> #include <windows.h>
#include <io.h> #include <io.h>
#else #else
@ -218,7 +218,7 @@ bool stdio_client::start_server_process() {
throw std::runtime_error("Unsupported type"); throw std::runtime_error("Unsupported type");
}; };
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32)
// Windows implementation // Windows implementation
SECURITY_ATTRIBUTES sa; SECURITY_ATTRIBUTES sa;
sa.nLength = sizeof(SECURITY_ATTRIBUTES); sa.nLength = sizeof(SECURITY_ATTRIBUTES);
@ -272,33 +272,34 @@ bool stdio_client::start_server_process() {
ZeroMemory(&pi, sizeof(PROCESS_INFORMATION)); ZeroMemory(&pi, sizeof(PROCESS_INFORMATION));
// Prepare environment variables // Add custom environment variables
std::string env_block;
if (!env_vars_.empty()) { if (!env_vars_.empty()) {
for (const auto& [key, value] : env_vars_.items()) { for (const auto& [key, value] : env_vars_.items()) {
std::string env_var = key + "=" + convert_to_string(value); std::string env_var_value = convert_to_string(value);
env_block += env_var + '\0'; SetEnvironmentVariableA(key.c_str(), env_var_value.c_str());
} }
env_block += '\0';
} }
// Create child process std::string cmd_line = "cmd.exe /c " + command_;
std::string cmd_line = command_;
char* cmd_str = const_cast<char*>(cmd_line.c_str());
char* cmd_line_ptr = _strdup(cmd_line.c_str());
// Create child process
BOOL success = CreateProcessA( BOOL success = CreateProcessA(
NULL, // Application name NULL, // Application name
cmd_str, // Command line cmd_line_ptr, // Command line
NULL, // Process security attributes NULL, // Process security attributes
NULL, // Thread security attributes NULL, // Thread security attributes
TRUE, // Inherit handles TRUE, // Inherit handles
CREATE_NO_WINDOW, // Creation flags CREATE_NO_WINDOW, // Creation flags
env_vars_.empty() ? NULL : (LPVOID)env_block.c_str(), // Environment variables NULL, // Environment variables
NULL, // Current directory NULL, // Current directory
&si, // Startup info &si, // Startup info
&pi // Process info &pi // Process info
); );
free(cmd_line_ptr);
if (!success) { if (!success) {
LOG_ERROR("Failed to create process: ", GetLastError()); LOG_ERROR("Failed to create process: ", GetLastError());
CloseHandle(child_stdin_read); CloseHandle(child_stdin_read);
@ -323,7 +324,8 @@ bool stdio_client::start_server_process() {
// Set non-blocking mode // Set non-blocking mode
DWORD mode = PIPE_NOWAIT; DWORD mode = PIPE_NOWAIT;
SetNamedPipeHandleState(stdout_pipe_[0], &mode, NULL, NULL); DWORD timeout = 100; // milliseconds
SetNamedPipeHandleState(stdout_pipe_[0], &mode, NULL, &timeout);
#else #else
// POSIX implementation // POSIX implementation
@ -459,7 +461,7 @@ bool stdio_client::start_server_process() {
// Wait for a while to ensure process starts // Wait for a while to ensure process starts
std::this_thread::sleep_for(std::chrono::milliseconds(500)); std::this_thread::sleep_for(std::chrono::milliseconds(500));
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32)
// Check if process is still running // Check if process is still running
DWORD exit_code; DWORD exit_code;
if (GetExitCodeProcess(process_handle_, &exit_code) && exit_code != STILL_ACTIVE) { if (GetExitCodeProcess(process_handle_, &exit_code) && exit_code != STILL_ACTIVE) {
@ -491,7 +493,7 @@ void stdio_client::stop_server_process() {
running_ = false; running_ = false;
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32)
// Windows implementation // Windows implementation
// Close pipes // Close pipes
if (stdin_pipe_[1] != NULL) { if (stdin_pipe_[1] != NULL) {
@ -584,15 +586,21 @@ void stdio_client::read_thread_func() {
char buffer[buffer_size]; char buffer[buffer_size];
std::string data_buffer; std::string data_buffer;
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32)
// Windows implementation // Windows implementation
DWORD bytes_read; DWORD bytes_read;
int retry_count = 0;
// Give the process some startup time (similar to UNIX implementation)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
while (running_) { while (running_) {
// Read data // Read data
BOOL success = ReadFile(stdout_pipe_[0], buffer, buffer_size - 1, &bytes_read, NULL); BOOL success = ReadFile(stdout_pipe_[0], buffer, buffer_size - 1, &bytes_read, NULL);
if (success && bytes_read > 0) { if (success && bytes_read > 0) {
// Successfully read data
retry_count = 0; // Reset retry count
buffer[bytes_read] = '\0'; buffer[bytes_read] = '\0';
data_buffer.append(buffer, bytes_read); data_buffer.append(buffer, bytes_read);
@ -644,20 +652,55 @@ void stdio_client::read_thread_func() {
} }
} else if (!success) { } else if (!success) {
DWORD error = GetLastError(); DWORD error = GetLastError();
if (error == ERROR_BROKEN_PIPE || error == ERROR_NO_DATA) {
// Pipe is closed or no data available if (error == ERROR_BROKEN_PIPE) {
LOG_WARNING("Pipe closed by server or no data available"); // The pipe is closed - check if the process is still running
break; DWORD exit_code;
if (GetExitCodeProcess(process_handle_, &exit_code) && exit_code != STILL_ACTIVE) {
LOG_WARNING("Service process exited, exit code: ", exit_code);
break;
}
// The pipe is closed but the process is still running - it might be a temporary state
retry_count++;
if (retry_count > 5) {
LOG_ERROR("The pipe is closed but the process is still running, retry count has reached the limit");
break;
}
// Retry after a short delay
std::this_thread::sleep_for(std::chrono::milliseconds(50 * retry_count));
} else if (error == ERROR_NO_DATA) {
// Simulate UNIX's EAGAIN/EWOULDBLOCK behavior
// The pipe is temporarily empty - this is normal for non-blocking mode
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} else if (error != ERROR_IO_PENDING) { } else if (error != ERROR_IO_PENDING) {
// Other errors, log and retry
LOG_ERROR("Error reading from pipe: ", error); LOG_ERROR("Error reading from pipe: ", error);
retry_count++;
if (retry_count > 10) {
LOG_ERROR("Read error retry count has reached the limit, exiting read loop");
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(20 * retry_count));
} else {
// IO_PENDING is the normal state for asynchronous IO
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
} else {
// ReadFile successfully but no data - similar to reading 0 bytes but not EOF on UNIX
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
// Periodically check if the process is still running (similar to UNIX's waitpid check)
if (retry_count > 3) {
DWORD exit_code;
if (GetExitCodeProcess(process_handle_, &exit_code) && exit_code != STILL_ACTIVE) {
LOG_WARNING("Service process exited, exit code: ", exit_code);
break; break;
} }
// No data to read in non-blocking mode
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} else {
// No data to read in non-blocking mode
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} }
} }
#else #else
@ -743,7 +786,7 @@ json stdio_client::send_jsonrpc(const request& req) {
json req_json = req.to_json(); json req_json = req.to_json();
std::string req_str = req_json.dump() + "\n"; std::string req_str = req_json.dump() + "\n";
#if defined(_WIN32) || defined(_WIN64) #if defined(_WIN32)
// Windows implementation // Windows implementation
DWORD bytes_written; DWORD bytes_written;
BOOL success = WriteFile(stdin_pipe_[1], req_str.c_str(), static_cast<DWORD>(req_str.size()), &bytes_written, NULL); BOOL success = WriteFile(stdin_pipe_[1], req_str.c_str(), static_cast<DWORD>(req_str.size()), &bytes_written, NULL);