From 1b54308d81fec2d1ecc435f29283ae7e3738dbbf Mon Sep 17 00:00:00 2001 From: hkr04 Date: Wed, 12 Mar 2025 22:45:17 +0800 Subject: [PATCH] clean some debug info with logger --- CMakeLists.txt | 6 +- examples/client_example.cpp | 13 +- examples/server_example.cpp | 15 +- include/mcp_client.h | 1 + include/mcp_logger.h | 10 +- include/mcp_server.h | 9 - src/mcp_client.cpp | 358 ++++++-------------------- src/mcp_server.cpp | 212 ++++----------- test/CMakeLists.txt | 9 + test/test_mcp_lifecycle_transport.cpp | 184 +++++++++++++ test/test_mcp_tool.cpp | 10 +- test/test_mcp_tools_extended.cpp | 357 +++++++++++++++++++++++++ test/test_mcp_versioning.cpp | 140 ++++++++++ 13 files changed, 829 insertions(+), 495 deletions(-) create mode 100644 test/test_mcp_lifecycle_transport.cpp create mode 100644 test/test_mcp_tools_extended.cpp create mode 100644 test/test_mcp_versioning.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index feba029..da91741 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,9 +33,5 @@ if(MCP_BUILD_TESTS) enable_testing() add_subdirectory(test) - # Add custom test target - add_custom_target(run_tests - COMMAND ${CMAKE_CTEST_COMMAND} --output-on-failure - COMMENT "Running MCP tests..." - ) + # 注意:run_tests目标已在test/CMakeLists.txt中定义,此处不再重复定义 endif() diff --git a/examples/client_example.cpp b/examples/client_example.cpp index a747245..f38a41c 100644 --- a/examples/client_example.cpp +++ b/examples/client_example.cpp @@ -52,6 +52,8 @@ int main() { for (const auto& tool : tools) { std::cout << "- " << tool.name << ": " << tool.description << std::endl; } + + // Get available resources // Call the get_time tool std::cout << "\nCalling get_time tool..." << std::endl; @@ -76,17 +78,6 @@ int main() { }; mcp::json calc_result = client.call_tool("calculator", calc_params); std::cout << "10 + 5 = " << calc_result["content"][0]["text"].get() << std::endl; - - // Not implemented yet - // // Access a resource - // std::cout << "\nAccessing API resource..." << std::endl; - // mcp::json api_params = { - // {"endpoint", "hello"}, - // {"name", "MCP Client"} - // }; - // mcp::json api_result = client.access_resource("/api", api_params); - // std::cout << "API response: " << api_result["contents"][0]["text"].get() << std::endl; - } catch (const mcp::mcp_exception& e) { std::cerr << "MCP error: " << e.what() << " (code: " << static_cast(e.code()) << ")" << std::endl; return 1; diff --git a/examples/server_example.cpp b/examples/server_example.cpp index 844eacb..2af0aca 100644 --- a/examples/server_example.cpp +++ b/examples/server_example.cpp @@ -127,7 +127,8 @@ int main() { // Set server capabilities mcp::json capabilities = { - {"tools", {{"listChanged", true}}} + {"tools", {{"listChanged", true}}}, + {"resources", {{"subscribe", false}, {"listChanged", true}}} }; server.set_capabilities(capabilities); @@ -154,15 +155,9 @@ int main() { server.register_tool(echo_tool, echo_handler); server.register_tool(calc_tool, calculator_handler); - // Not implemented yet - // // Register resources - // auto file_resource = std::make_shared("./files"); - // server.register_resource("/files", file_resource); - - // auto api_resource = std::make_shared("API", "Custom API endpoints"); - // api_resource->register_handler("hello", hello_handler, "Say hello"); - - // server.register_resource("/api", api_resource); + // Register resources + auto file_resource = std::make_shared("./Makefile"); + server.register_resource("file://./Makefile", file_resource); // Start server std::cout << "Starting MCP server at localhost:8888..." << std::endl; diff --git a/include/mcp_client.h b/include/mcp_client.h index cf69ad5..f929829 100644 --- a/include/mcp_client.h +++ b/include/mcp_client.h @@ -11,6 +11,7 @@ #include "mcp_message.h" #include "mcp_tool.h" +#include "mcp_logger.h" // Include the HTTP library #include "httplib.h" diff --git a/include/mcp_logger.h b/include/mcp_logger.h index 62b9baa..8ea3ef5 100644 --- a/include/mcp_logger.h +++ b/include/mcp_logger.h @@ -83,19 +83,19 @@ private: ss << std::put_time(now_tm, "%Y-%m-%d %H:%M:%S") << " "; - // 添加日志级别 + // 添加日志级别和颜色 switch (level) { case log_level::debug: - ss << "[DEBUG] "; + ss << "\033[36m[DEBUG]\033[0m "; // 青色 break; case log_level::info: - ss << "[INFO] "; + ss << "\033[32m[INFO]\033[0m "; // 绿色 break; case log_level::warning: - ss << "[WARNING] "; + ss << "\033[33m[WARNING]\033[0m "; // 黄色 break; case log_level::error: - ss << "[ERROR] "; + ss << "\033[31m[ERROR]\033[0m "; // 红色 break; } diff --git a/include/mcp_server.h b/include/mcp_server.h index 49e19e0..4a03d23 100644 --- a/include/mcp_server.h +++ b/include/mcp_server.h @@ -79,13 +79,11 @@ public: try { bool write_result = sink->write(message_copy.data(), message_copy.size()); if (!write_result) { - std::cerr << "写入事件数据失败: 客户端可能已关闭连接" << std::endl; close(); return false; } return true; } catch (const std::exception& e) { - std::cerr << "写入事件数据失败: " << e.what() << std::endl; close(); return false; } @@ -232,13 +230,6 @@ public: */ void send_request(const std::string& session_id, const std::string& method, const json& params = json::object()); - /** - * @brief 打印服务器状态 - * - * 打印当前服务器的状态,包括活跃的会话、注册的方法等 - */ - void print_status() const; - private: std::string host_; int port_; diff --git a/src/mcp_client.cpp b/src/mcp_client.cpp index f6f8ba5..5821f41 100644 --- a/src/mcp_client.cpp +++ b/src/mcp_client.cpp @@ -23,55 +23,41 @@ client::client(const std::string& base_url, const json& capabilities, const std: } client::~client() { - // 关闭SSE连接 close_sse_connection(); - - // httplib::Client将自动销毁 } void client::init_client(const std::string& host, int port) { - // 创建两个独立的HTTP客户端实例 - // 一个用于SSE连接,一个用于JSON-RPC请求 http_client_ = std::make_unique(host.c_str(), port); sse_client_ = std::make_unique(host.c_str(), port); - // 设置超时 http_client_->set_connection_timeout(timeout_seconds_, 0); http_client_->set_read_timeout(timeout_seconds_, 0); http_client_->set_write_timeout(timeout_seconds_, 0); - // SSE客户端需要更长的超时时间,因为它会保持长连接 sse_client_->set_connection_timeout(timeout_seconds_ * 2, 0); sse_client_->set_write_timeout(timeout_seconds_, 0); } void client::init_client(const std::string& base_url) { - // 创建两个独立的HTTP客户端实例 - // 一个用于SSE连接,一个用于JSON-RPC请求 http_client_ = std::make_unique(base_url.c_str()); sse_client_ = std::make_unique(base_url.c_str()); - // 设置超时 http_client_->set_connection_timeout(timeout_seconds_, 0); http_client_->set_read_timeout(timeout_seconds_, 0); http_client_->set_write_timeout(timeout_seconds_, 0); - // SSE客户端需要更长的超时时间,因为它会保持长连接 sse_client_->set_connection_timeout(timeout_seconds_ * 2, 0); - sse_client_->set_read_timeout(0, 0); // 无限读取超时,适合SSE长连接 + sse_client_->set_read_timeout(0, 0); sse_client_->set_write_timeout(timeout_seconds_, 0); } bool client::initialize(const std::string& client_name, const std::string& client_version) { - std::cerr << "开始初始化MCP客户端..." << std::endl; + LOG_INFO("Initializing MCP client..."); - // 检查服务器是否可访问 if (!check_server_accessible()) { - std::cerr << "服务器不可访问,初始化失败" << std::endl; return false; } - // Create initialization request request req = request::create("initialize", { {"protocolVersion", MCP_VERSION}, {"capabilities", capabilities_}, @@ -82,88 +68,63 @@ bool client::initialize(const std::string& client_name, const std::string& clien }); try { - // 打开SSE连接 - std::cerr << "正在打开SSE连接..." << std::endl; + LOG_INFO("Opening SSE connection..."); open_sse_connection(); - // 等待SSE连接建立并获取消息端点 - // 使用条件变量和超时机制 - const auto timeout = std::chrono::milliseconds(5000); // 5秒超时 + const auto timeout = std::chrono::milliseconds(5000); { std::unique_lock lock(mutex_); - // 检查初始状态 - if (!msg_endpoint_.empty()) { - std::cerr << "消息端点已经设置: " << msg_endpoint_ << std::endl; - } else { - std::cerr << "等待条件变量..." << std::endl; - } - bool success = endpoint_cv_.wait_for(lock, timeout, [this]() { if (!sse_running_) { - std::cerr << "SSE连接已关闭,停止等待" << std::endl; + LOG_WARNING("SSE connection closed, stopping wait"); return true; } if (!msg_endpoint_.empty()) { - std::cerr << "消息端点已设置,停止等待" << std::endl; + LOG_INFO("Message endpoint set, stopping wait"); return true; } return false; }); - // 检查等待结果 if (!success) { - std::cerr << "条件变量等待超时" << std::endl; + LOG_WARNING("Condition variable wait timed out"); } - // 如果SSE连接已关闭或等待超时,抛出异常 if (!sse_running_) { - throw std::runtime_error("SSE连接已关闭,未能获取消息端点"); + throw std::runtime_error("SSE connection closed, failed to get message endpoint"); } if (msg_endpoint_.empty()) { - throw std::runtime_error("等待SSE连接超时,未能获取消息端点"); + throw std::runtime_error("Timeout waiting for SSE connection, failed to get message endpoint"); } - std::cerr << "成功获取消息端点: " << msg_endpoint_ << std::endl; + LOG_INFO("Successfully got message endpoint: ", msg_endpoint_); } - // 发送初始化请求 json result = send_jsonrpc(req); - // 存储服务器能力 server_capabilities_ = result["capabilities"]; - // 发送已初始化通知 request notification = request::create_notification("initialized"); send_jsonrpc(notification); return true; } catch (const std::exception& e) { - // 初始化失败,关闭SSE连接 - std::cerr << "初始化失败: " << e.what() << std::endl; + LOG_ERROR("Initialization failed: ", e.what()); close_sse_connection(); return false; } } bool client::ping() { - // Create ping request request req = request::create("ping", {}); try { - // Send the request json result = send_jsonrpc(req); - - // The receiver MUST respond promptly with an empty response - if (result.empty()) { - return true; - } else { - return false; - } + return result.empty(); } catch (const std::exception& e) { - // Ping failed return false; } } @@ -171,8 +132,6 @@ bool client::ping() { void client::set_auth_token(const std::string& token) { std::lock_guard lock(mutex_); auth_token_ = token; - - // 为两个客户端都添加默认头 set_header("Authorization", "Bearer " + auth_token_); } @@ -180,7 +139,6 @@ void client::set_header(const std::string& key, const std::string& value) { std::lock_guard lock(mutex_); default_headers_[key] = value; - // 确保两个客户端实例都设置了相同的头 if (http_client_) { http_client_->set_default_headers({{key, value}}); } @@ -193,7 +151,6 @@ void client::set_timeout(int timeout_seconds) { std::lock_guard lock(mutex_); timeout_seconds_ = timeout_seconds; - // 更新两个客户端的超时设置 if (http_client_) { http_client_->set_connection_timeout(timeout_seconds_, 0); http_client_->set_write_timeout(timeout_seconds_, 0); @@ -239,21 +196,28 @@ json client::call_tool(const std::string& tool_name, const json& arguments) { } std::vector client::get_tools() { - json tools_json = send_request("tools/list", {}).result; + json response_json = send_request("tools/list", {}).result; std::vector tools; - if (tools_json.is_array()) { - for (const auto& tool_json : tools_json) { - tool t; - t.name = tool_json["name"]; - t.description = tool_json["description"]; - - if (tool_json.contains("inputSchema")) { - t.parameters_schema = tool_json["inputSchema"]; - } - - tools.push_back(t); + json tools_json; + if (response_json.contains("tools") && response_json["tools"].is_array()) { + tools_json = response_json["tools"]; + } else if (response_json.is_array()) { + tools_json = response_json; + } else { + return tools; + } + + for (const auto& tool_json : tools_json) { + tool t; + t.name = tool_json["name"]; + t.description = tool_json["description"]; + + if (tool_json.contains("inputSchema")) { + t.parameters_schema = tool_json["inputSchema"]; } + + tools.push_back(t); } return tools; @@ -288,130 +252,82 @@ json client::list_resource_templates() { } void client::open_sse_connection() { - // 设置SSE连接状态为运行中 sse_running_ = true; - // 清空消息端点 { std::lock_guard lock(mutex_); msg_endpoint_.clear(); - - // 通知等待的线程(虽然消息端点为空,但可以让等待的线程检查sse_running_状态) endpoint_cv_.notify_all(); } - // 打印连接信息(调试用) std::string connection_info; if (!base_url_.empty()) { connection_info = "Base URL: " + base_url_ + ", SSE Endpoint: " + sse_endpoint_; } else { connection_info = "Host: " + host_ + ", Port: " + std::to_string(port_) + ", SSE Endpoint: " + sse_endpoint_; } - std::cerr << "尝试建立SSE连接: " << connection_info << std::endl; + LOG_INFO("Attempting to establish SSE connection: ", connection_info); - // 创建并启动SSE线程 sse_thread_ = std::make_unique([this]() { int retry_count = 0; const int max_retries = 5; - const int retry_delay_base = 1000; // 毫秒 + const int retry_delay_base = 1000; while (sse_running_) { try { - // 尝试建立SSE连接 - std::cerr << "SSE线程: 尝试连接到 " << sse_endpoint_ << std::endl; + LOG_INFO("SSE thread: Attempting to connect to ", sse_endpoint_); - // 使用专用的SSE客户端实例 auto res = sse_client_->Get(sse_endpoint_.c_str(), [this](const char *data, size_t data_length) { - // 解析SSE数据 - std::cerr << "SSE线程: 收到数据,长度: " << data_length << std::endl; if (!parse_sse_data(data, data_length)) { - std::cerr << "SSE线程: 解析数据失败" << std::endl; - return false; // 解析失败,关闭连接 + LOG_ERROR("SSE thread: Failed to parse data"); + return false; } - // 检查是否应该关闭连接 bool should_continue = sse_running_.load(); if (!should_continue) { - std::cerr << "SSE线程: sse_running_为false,关闭连接" << std::endl; + LOG_INFO("SSE thread: sse_running_ is false, closing connection"); } - return should_continue; // 如果sse_running_为false,关闭连接 + return should_continue; }); - // 检查连接是否成功 if (!res) { - std::string error_msg = "SSE连接失败: "; - error_msg += "错误代码: " + std::to_string(static_cast(res.error())); - - // 添加更详细的错误信息 - switch (res.error()) { - case httplib::Error::Connection: - error_msg += " (连接错误,服务器可能未运行或无法访问)"; - break; - case httplib::Error::Read: - error_msg += " (读取错误,服务器可能关闭了连接或响应格式不正确)"; - break; - case httplib::Error::Write: - error_msg += " (写入错误)"; - break; - case httplib::Error::ConnectionTimeout: - error_msg += " (连接超时)"; - break; - case httplib::Error::Canceled: - // 如果是由于sse_running_=false导致的取消,这是正常的关闭过程 - if (!sse_running_) { - std::cerr << "SSE连接已被主动关闭 (请求被取消)" << std::endl; - return; // 直接返回,不再重试 - } else { - error_msg += " (请求被取消)"; - } - break; - default: - error_msg += " (未知错误)"; - break; - } - + std::string error_msg = "SSE connection failed: "; + error_msg += httplib::to_string(res.error()); throw std::runtime_error(error_msg); } - // 连接成功后重置重试计数 retry_count = 0; - std::cerr << "SSE线程: 连接成功" << std::endl; + LOG_INFO("SSE thread: Connection successful"); } catch (const std::exception& e) { - // 记录错误 - std::cerr << "SSE连接错误: " << e.what() << std::endl; + LOG_ERROR("SSE connection error: ", e.what()); - // 如果sse_running_为false,说明是主动关闭,不需要重试 if (!sse_running_) { - std::cerr << "SSE连接已被主动关闭,不再重试" << std::endl; + LOG_INFO("SSE connection actively closed, no retry needed"); break; } - // 如果已达到最大重试次数,停止尝试 if (++retry_count > max_retries) { - std::cerr << "达到最大重试次数,停止SSE连接尝试" << std::endl; + LOG_ERROR("Maximum retry count reached, stopping SSE connection attempts"); break; } - // 指数退避重试 - int delay = retry_delay_base * (1 << (retry_count - 1)); // 2^(retry_count-1) * base_delay - std::cerr << "将在 " << delay << " 毫秒后重试 (尝试 " << retry_count << "/" << max_retries << ")" << std::endl; + int delay = retry_delay_base * (1 << (retry_count - 1)); + LOG_INFO("Will retry in ", delay, " ms (attempt ", retry_count, "/", max_retries, ")"); - // 在等待期间定期检查sse_running_状态 - const int check_interval = 100; // 每100毫秒检查一次 + const int check_interval = 100; for (int waited = 0; waited < delay && sse_running_; waited += check_interval) { std::this_thread::sleep_for(std::chrono::milliseconds(check_interval)); } - // 如果在等待期间sse_running_变为false,直接退出 if (!sse_running_) { - std::cerr << "在等待重试期间检测到SSE连接已被主动关闭,不再重试" << std::endl; + LOG_INFO("SSE connection actively closed during retry wait, stopping retry"); break; } } } - std::cerr << "SSE线程: 退出" << std::endl; + LOG_INFO("SSE thread: Exiting"); }); } @@ -419,278 +335,187 @@ bool client::parse_sse_data(const char* data, size_t length) { try { std::string sse_data(data, length); - // 查找事件类型 - std::string event_type = "message"; // 默认事件类型 + std::string event_type = "message"; auto event_pos = sse_data.find("event: "); if (event_pos != std::string::npos) { auto event_end = sse_data.find("\n", event_pos); if (event_end != std::string::npos) { event_type = sse_data.substr(event_pos + 7, event_end - (event_pos + 7)); - // 移除可能的回车符 if (!event_type.empty() && event_type.back() == '\r') { event_type.pop_back(); } } } - // 查找"data:"标记 auto data_pos = sse_data.find("data: "); if (data_pos == std::string::npos) { - return true; // 不是数据事件,可能是注释或心跳,继续保持连接 + return true; } - // 查找数据行结束位置 auto newline_pos = sse_data.find("\n", data_pos); if (newline_pos == std::string::npos) { - newline_pos = sse_data.length(); // 如果没有换行符,使用整个字符串 + newline_pos = sse_data.length(); } - // 提取数据内容 std::string data_content = sse_data.substr(data_pos + 6, newline_pos - (data_pos + 6)); - // 处理不同类型的事件 if (event_type == "heartbeat") { - // 心跳事件,不需要处理数据 return true; } else if (event_type == "endpoint") { - // 端点事件,更新消息端点 std::lock_guard lock(mutex_); msg_endpoint_ = data_content; - - // 通知等待的线程 endpoint_cv_.notify_all(); return true; } else if (event_type == "message") { - // 消息事件,尝试解析为JSON-RPC响应 try { json response = json::parse(data_content); - // 检查是否是有效的JSON-RPC响应 if (response.contains("jsonrpc") && response.contains("id") && !response["id"].is_null()) { json id = response["id"]; - // 查找对应的请求 std::lock_guard lock(response_mutex_); auto it = pending_requests_.find(id); if (it != pending_requests_.end()) { - // 设置响应结果 if (response.contains("result")) { it->second.set_value(response["result"]); } else if (response.contains("error")) { - // 创建一个包含错误信息的JSON对象 json error_result = { {"isError", true}, {"error", response["error"]} }; it->second.set_value(error_result); } else { - // 设置空结果 it->second.set_value(json::object()); } - // 移除已完成的请求 pending_requests_.erase(it); } else { - std::cerr << "收到未知请求ID的响应: " << id << std::endl; + LOG_WARNING("Received response for unknown request ID: ", id); } } else { - std::cerr << "收到无效的JSON-RPC响应: " << response.dump() << std::endl; + LOG_WARNING("Received invalid JSON-RPC response: ", response.dump()); } } catch (const json::exception& e) { - std::cerr << "解析JSON-RPC响应失败: " << e.what() << std::endl; + LOG_ERROR("Failed to parse JSON-RPC response: ", e.what()); } return true; } else { - // 未知事件类型,记录但继续保持连接 - std::cerr << "收到未知事件类型: " << event_type << std::endl; + LOG_WARNING("Received unknown event type: ", event_type); return true; } } catch (const std::exception& e) { - std::cerr << "解析SSE数据错误: " << e.what() << std::endl; + LOG_ERROR("Error parsing SSE data: ", e.what()); return false; } } void client::close_sse_connection() { - // 检查是否已经关闭 if (!sse_running_) { - std::cerr << "SSE连接已经关闭,无需再次关闭" << std::endl; + LOG_INFO("SSE connection already closed"); return; } - std::cerr << "正在主动关闭SSE连接(正常退出流程)..." << std::endl; + LOG_INFO("Actively closing SSE connection (normal exit flow)..."); - // 设置标志,这将导致SSE回调函数返回false,从而关闭连接 sse_running_ = false; - // 给一些时间让回调函数返回false并关闭连接 std::this_thread::sleep_for(std::chrono::milliseconds(500)); - // 等待SSE线程结束 if (sse_thread_ && sse_thread_->joinable()) { - // 设置一个合理的超时时间,例如5秒 auto timeout = std::chrono::seconds(5); auto start = std::chrono::steady_clock::now(); - std::cerr << "等待SSE线程结束..." << std::endl; + LOG_INFO("Waiting for SSE thread to end..."); - // 尝试在超时前等待线程结束 while (sse_thread_->joinable() && std::chrono::steady_clock::now() - start < timeout) { try { - // 尝试立即加入线程 sse_thread_->join(); - std::cerr << "SSE线程已成功结束" << std::endl; - break; // 如果成功加入,跳出循环 + LOG_INFO("SSE thread successfully ended"); + break; } catch (const std::exception& e) { - std::cerr << "等待SSE线程时出错: " << e.what() << std::endl; - // 短暂休眠,避免CPU占用过高 + LOG_ERROR("Error waiting for SSE thread: ", e.what()); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } - // 如果线程仍然没有结束,记录警告并分离线程 if (sse_thread_->joinable()) { - std::cerr << "警告: SSE线程未能在超时时间内结束,分离线程" << std::endl; + LOG_WARNING("SSE thread did not end within timeout, detaching thread"); sse_thread_->detach(); } - } else { - std::cerr << "SSE线程不存在或已经结束" << std::endl; } - // 清空消息端点 { std::lock_guard lock(mutex_); msg_endpoint_.clear(); - - // 通知等待的线程(虽然消息端点为空,但可以让等待的线程检查sse_running_状态) endpoint_cv_.notify_all(); } - std::cerr << "SSE连接已成功关闭(正常退出流程)" << std::endl; + LOG_INFO("SSE connection successfully closed (normal exit flow)"); } json client::send_jsonrpc(const request& req) { std::lock_guard lock(mutex_); - // 检查消息端点是否已设置 if (msg_endpoint_.empty()) { - throw mcp_exception(error_code::internal_error, "消息端点未设置,SSE连接可能未建立"); + throw mcp_exception(error_code::internal_error, "Message endpoint not set, SSE connection may not be established"); } - // 打印请求信息(调试用) - std::cerr << "发送JSON-RPC请求: 方法=" << req.method << ", 端点=" << msg_endpoint_ << std::endl; - - // Convert request to JSON json req_json = req.to_json(); std::string req_body = req_json.dump(); - // Prepare headers httplib::Headers headers; headers.emplace("Content-Type", "application/json"); - // Add default headers for (const auto& [key, value] : default_headers_) { headers.emplace(key, value); } - // Check if it's a notification (no response expected) if (req.is_notification()) { - // 使用主HTTP客户端发送请求 auto result = http_client_->Post(msg_endpoint_, headers, req_body, "application/json"); if (!result) { - // Error occurred auto err = result.error(); - std::string error_msg; - - switch (err) { - case httplib::Error::Connection: - error_msg = "连接错误,服务器可能未运行或无法访问"; - break; - case httplib::Error::Read: - error_msg = "读取错误,服务器可能关闭了连接或响应格式不正确"; - break; - case httplib::Error::Write: - error_msg = "写入错误"; - break; - case httplib::Error::ConnectionTimeout: - error_msg = "连接超时"; - break; - default: - error_msg = "HTTP客户端错误: " + std::to_string(static_cast(err)); - break; - } - - std::cerr << "JSON-RPC请求失败: " << error_msg << std::endl; + std::string error_msg = httplib::to_string(err); + LOG_ERROR("JSON-RPC request failed: ", error_msg); throw mcp_exception(error_code::internal_error, error_msg); } return json::object(); } - // 创建Promise和Future,用于等待响应 std::promise response_promise; std::future response_future = response_promise.get_future(); - // 将请求ID和Promise添加到映射表 { std::lock_guard response_lock(response_mutex_); pending_requests_[req.id] = std::move(response_promise); } - // 使用主HTTP客户端发送请求 auto result = http_client_->Post(msg_endpoint_, headers, req_body, "application/json"); if (!result) { - // Error occurred auto err = result.error(); - std::string error_msg; + std::string error_msg = httplib::to_string(err); - switch (err) { - case httplib::Error::Connection: - error_msg = "连接错误,服务器可能未运行或无法访问"; - break; - case httplib::Error::Read: - error_msg = "读取错误,服务器可能关闭了连接或响应格式不正确"; - break; - case httplib::Error::Write: - error_msg = "写入错误"; - break; - case httplib::Error::ConnectionTimeout: - error_msg = "连接超时"; - break; - default: - error_msg = "HTTP客户端错误: " + std::to_string(static_cast(err)); - break; - } - - // 移除请求 { std::lock_guard response_lock(response_mutex_); pending_requests_.erase(req.id); } - std::cerr << "JSON-RPC请求失败: " << error_msg << std::endl; + LOG_ERROR("JSON-RPC request failed: ", error_msg); throw mcp_exception(error_code::internal_error, error_msg); } - // 检查HTTP状态码 if (result->status != 202) { - // 非202状态码,尝试解析响应 try { json res_json = json::parse(result->body); - // 打印响应信息(调试用) - std::cerr << "收到HTTP响应: 状态码=" << result->status << ", 内容=" << res_json.dump() << std::endl; - - // 移除请求 { std::lock_guard response_lock(response_mutex_); pending_requests_.erase(req.id); } - // Check for error if (res_json.contains("error")) { int code = res_json["error"]["code"]; std::string message = res_json["error"]["message"]; @@ -698,14 +523,12 @@ json client::send_jsonrpc(const request& req) { throw mcp_exception(static_cast(code), message); } - // Return result if (res_json.contains("result")) { return res_json["result"]; } else { return json::object(); } } catch (const json::exception& e) { - // 移除请求 { std::lock_guard response_lock(response_mutex_); pending_requests_.erase(req.id); @@ -715,20 +538,13 @@ json client::send_jsonrpc(const request& req) { "Failed to parse JSON-RPC response: " + std::string(e.what())); } } else { - // 202状态码,等待SSE响应 - std::cerr << "收到202 Accepted响应,等待SSE响应..." << std::endl; - - // 设置超时时间 const auto timeout = std::chrono::seconds(timeout_seconds_); - // 等待响应 auto status = response_future.wait_for(timeout); if (status == std::future_status::ready) { - // 获取响应 json response = response_future.get(); - // 检查是否是错误响应 if (response.contains("isError") && response["isError"].get()) { int code = response["error"]["code"]; std::string message = response["error"]["message"]; @@ -738,54 +554,32 @@ json client::send_jsonrpc(const request& req) { return response; } else { - // 超时,移除请求 { std::lock_guard response_lock(response_mutex_); pending_requests_.erase(req.id); } - throw mcp_exception(error_code::internal_error, "等待SSE响应超时"); + throw mcp_exception(error_code::internal_error, "Timeout waiting for SSE response"); } } } bool client::check_server_accessible() { - std::cerr << "检查服务器是否可访问..." << std::endl; + LOG_INFO("Checking if server is accessible..."); try { - // 尝试发送一个简单的GET请求到服务器 auto res = http_client_->Get("/"); if (res) { - std::cerr << "服务器可访问,状态码: " << res->status << std::endl; + LOG_INFO("Server is accessible, status code: ", res->status); return true; } else { - std::string error_msg = "服务器不可访问,错误代码: " + std::to_string(static_cast(res.error())); - - // 添加更详细的错误信息 - switch (res.error()) { - case httplib::Error::Connection: - error_msg += " (连接错误,服务器可能未运行或无法访问)"; - break; - case httplib::Error::Read: - error_msg += " (读取错误)"; - break; - case httplib::Error::Write: - error_msg += " (写入错误)"; - break; - case httplib::Error::ConnectionTimeout: - error_msg += " (连接超时)"; - break; - default: - error_msg += " (未知错误)"; - break; - } - - std::cerr << error_msg << std::endl; + std::string error_msg = "Server not accessible: " + httplib::to_string(res.error()); + LOG_ERROR(error_msg); return false; } } catch (const std::exception& e) { - std::cerr << "检查服务器可访问性时发生异常: " << e.what() << std::endl; + LOG_ERROR("Exception while checking server accessibility: ", e.what()); return false; } } diff --git a/src/mcp_server.cpp b/src/mcp_server.cpp index a7a3091..71213d8 100644 --- a/src/mcp_server.cpp +++ b/src/mcp_server.cpp @@ -14,14 +14,6 @@ server::server(const std::string& host, int port, const std::string& sse_endpoin : host_(host), port_(port), sse_endpoint_(sse_endpoint), msg_endpoint_(msg_endpoint_prefix), name_("MCP Server"), version_(MCP_VERSION) { http_server_ = std::make_unique(); - - // Set default capabilities - capabilities_ = { - {"resources", { - {"subscribe", true}, - {"listChanged", true} - }} - }; } server::~server() { @@ -33,7 +25,7 @@ bool server::start(bool blocking) { return true; // Already running } - std::cerr << "启动MCP服务器: " << host_ << ":" << port_ << std::endl; + LOG_INFO("Starting MCP server on ", host_, ":", port_); // 设置CORS处理 http_server_->Options(".*", [](const httplib::Request& req, httplib::Response& res) { @@ -46,36 +38,36 @@ bool server::start(bool blocking) { // 设置JSON-RPC端点 http_server_->Post(msg_endpoint_.c_str(), [this](const httplib::Request& req, httplib::Response& res) { this->handle_jsonrpc(req, res); + LOG_INFO(req.remote_addr, ":", req.remote_port, " - \"POST ", req.path, " HTTP/1.1\" ", res.status); }); // 设置SSE端点 http_server_->Get(sse_endpoint_.c_str(), [this](const httplib::Request& req, httplib::Response& res) { this->handle_sse(req, res); + LOG_INFO(req.remote_addr, ":", req.remote_port, " - \"GET ", req.path, " HTTP/1.1\" ", res.status); }); // 启动服务器 if (blocking) { running_ = true; - std::cerr << "以阻塞模式启动服务器" << std::endl; - print_status(); + LOG_INFO("Starting server in blocking mode"); if (!http_server_->listen(host_.c_str(), port_)) { running_ = false; - std::cerr << "无法在 " << host_ << ":" << port_ << " 上启动服务器" << std::endl; + LOG_ERROR("Failed to start server on ", host_, ":", port_); return false; } return true; } else { // 在单独的线程中启动 server_thread_ = std::make_unique([this]() { - std::cerr << "在单独的线程中启动服务器" << std::endl; + LOG_INFO("Starting server in separate thread"); if (!http_server_->listen(host_.c_str(), port_)) { - std::cerr << "无法在 " << host_ << ":" << port_ << " 上启动服务器" << std::endl; + LOG_ERROR("Failed to start server on ", host_, ":", port_); running_ = false; return; } }); running_ = true; - print_status(); return true; } } @@ -85,15 +77,13 @@ void server::stop() { return; } - std::cerr << "正在停止MCP服务器..." << std::endl; - print_status(); + LOG_INFO("Stopping MCP server..."); running_ = false; // 关闭所有SSE连接 std::vector session_ids; { std::lock_guard lock(mutex_); - // 先收集所有会话ID for (const auto& [session_id, _] : session_dispatchers_) { session_ids.push_back(session_id); } @@ -102,31 +92,25 @@ void server::stop() { // 关闭每个会话的分发器 for (const auto& session_id : session_ids) { try { - std::cerr << "关闭会话: " << session_id << std::endl; std::lock_guard lock(mutex_); auto it = session_dispatchers_.find(session_id); if (it != session_dispatchers_.end()) { it->second->close(); } } catch (const std::exception& e) { - std::cerr << "关闭会话时发生异常: " << session_id << ", " << e.what() << std::endl; + LOG_ERROR("Exception while closing session ", session_id, ": ", e.what()); } } - // 等待一段时间,让会话线程有机会自行清理 - std::cerr << "等待会话线程自行清理..." << std::endl; - std::this_thread::sleep_for(std::chrono::seconds(1)); - // 清理剩余的线程 { std::lock_guard lock(mutex_); for (auto& [session_id, thread] : sse_threads_) { if (thread && thread->joinable()) { try { - std::cerr << "分离会话线程: " << session_id << std::endl; thread->detach(); } catch (const std::exception& e) { - std::cerr << "分离会话线程时发生异常: " << session_id << ", " << e.what() << std::endl; + LOG_ERROR("Exception while detaching session thread ", session_id, ": ", e.what()); } } } @@ -137,16 +121,14 @@ void server::stop() { } if (http_server_) { - std::cerr << "停止HTTP服务器..." << std::endl; http_server_->stop(); } if (server_thread_ && server_thread_->joinable()) { - std::cerr << "等待服务器线程结束..." << std::endl; server_thread_->join(); } - std::cerr << "MCP服务器已停止" << std::endl; + LOG_INFO("MCP server stopped"); } bool server::is_running() const { @@ -212,10 +194,7 @@ void server::register_resource(const std::string& path, std::shared_ptr json { - // In this implementation, we don't support resource templates return json::array(); }; } @@ -310,18 +286,9 @@ void server::set_auth_handler(std::function handler) { } void server::handle_sse(const httplib::Request& req, httplib::Response& res) { - // 生成会话ID std::string session_id = generate_session_id(); std::string session_uri = msg_endpoint_ + "?session_id=" + session_id; - std::cerr << "新的SSE连接: 客户端=" << req.remote_addr << ", 会话ID=" << session_id << std::endl; - std::cerr << "会话URI: " << session_uri << std::endl; - std::cerr << "请求头: "; - for (const auto& [key, value] : req.headers) { - std::cerr << key << "=" << value << " "; - } - std::cerr << std::endl; - // 设置SSE响应头 res.set_header("Content-Type", "text/event-stream"); res.set_header("Cache-Control", "no-cache"); @@ -337,62 +304,46 @@ void server::handle_sse(const httplib::Request& req, httplib::Response& res) { session_dispatchers_[session_id] = session_dispatcher; } - // 创建会话线程,使用值捕获而不是引用捕获 - auto thread = std::make_unique([this, session_id, session_uri, session_dispatcher]() { + // 创建会话线程 + auto thread = std::make_unique([this, res, session_id, session_uri, session_dispatcher]() { try { - std::cerr << "SSE会话线程启动: " << session_id << std::endl; - - // 发送初始会话URI - 使用endpoint事件类型,符合MCP规范 + // 发送初始会话URI std::this_thread::sleep_for(std::chrono::milliseconds(500)); std::stringstream ss; ss << "event: endpoint\ndata: " << session_uri << "\n\n"; session_dispatcher->send_event(ss.str()); - std::cerr << "发送会话URI: " << session_uri << " 到会话: " << session_id << std::endl; // 定期发送心跳,检测连接状态 int heartbeat_count = 0; while (running_ && !session_dispatcher->is_closed()) { std::this_thread::sleep_for(std::chrono::seconds(10)); - // 检查分发器是否已关闭 - if (session_dispatcher->is_closed()) { - std::cerr << "会话已关闭,停止心跳: " << session_id << std::endl; + if (session_dispatcher->is_closed() || !running_) { break; } - // 检查服务器是否仍在运行 - if (!running_) { - std::cerr << "服务器已停止,停止心跳: " << session_id << std::endl; - break; - } - - // 发送心跳事件 - 使用自定义heartbeat事件类型 std::stringstream heartbeat; heartbeat << "event: heartbeat\ndata: " << heartbeat_count++ << "\n\n"; try { bool sent = session_dispatcher->send_event(heartbeat.str()); if (!sent) { - std::cerr << "发送心跳失败,客户端可能已关闭连接: " << session_id << std::endl; + LOG_WARNING("Failed to send heartbeat, client may have closed connection: ", session_id); break; } - std::cerr << "发送心跳到会话: " << session_id << ", 计数: " << heartbeat_count << std::endl; } catch (const std::exception& e) { - std::cerr << "发送心跳失败,假定连接已关闭: " << e.what() << std::endl; + LOG_ERROR("Failed to send heartbeat: ", e.what()); break; } } - - std::cerr << "SSE会话线程退出: " << session_id << std::endl; } catch (const std::exception& e) { - std::cerr << "SSE会话线程异常: " << session_id << ", " << e.what() << std::endl; + LOG_ERROR("SSE session thread exception: ", session_id, ", ", e.what()); } // 安全地清理资源 try { std::lock_guard lock(mutex_); - // 先关闭分发器 auto dispatcher_it = session_dispatchers_.find(session_id); if (dispatcher_it != session_dispatchers_.end()) { if (!dispatcher_it->second->is_closed()) { @@ -401,18 +352,13 @@ void server::handle_sse(const httplib::Request& req, httplib::Response& res) { session_dispatchers_.erase(dispatcher_it); } - // 再移除线程 auto thread_it = sse_threads_.find(session_id); if (thread_it != sse_threads_.end()) { - // 不要在线程内部join或detach自己 - // 只从映射中移除 - thread_it->second.release(); // 释放所有权但不删除线程对象 + thread_it->second.release(); sse_threads_.erase(thread_it); } - - std::cerr << "会话资源已清理: " << session_id << std::endl; } catch (const std::exception& e) { - std::cerr << "清理会话资源时发生异常: " << session_id << ", " << e.what() << std::endl; + LOG_ERROR("Exception while cleaning up session resources: ", session_id, ", ", e.what()); } }); @@ -430,9 +376,6 @@ void server::handle_sse(const httplib::Request& req, httplib::Response& res) { std::lock_guard lock(mutex_); auto it = session_dispatchers_.find(session_id); if (it == session_dispatchers_.end() || it->second->is_closed()) { - std::cerr << "会话已关闭,停止内容提供: " << session_id << std::endl; - - // 不在这里清理资源,让会话线程自己清理 return false; } } @@ -440,7 +383,7 @@ void server::handle_sse(const httplib::Request& req, httplib::Response& res) { // 等待事件 bool result = session_dispatcher->wait_event(&sink); if (!result) { - std::cerr << "等待事件失败,关闭连接: " << session_id << std::endl; + LOG_WARNING("Failed to wait for event, closing connection: ", session_id); // 关闭会话分发器,但不清理资源 { @@ -454,10 +397,9 @@ void server::handle_sse(const httplib::Request& req, httplib::Response& res) { return false; } - return true; } catch (const std::exception& e) { - std::cerr << "SSE内容提供者异常: " << e.what() << std::endl; + LOG_ERROR("SSE content provider exception: ", e.what()); // 关闭会话分发器,但不清理资源 try { @@ -467,7 +409,7 @@ void server::handle_sse(const httplib::Request& req, httplib::Response& res) { it->second->close(); } } catch (const std::exception& e2) { - std::cerr << "关闭会话分发器时发生异常: " << e2.what() << std::endl; + LOG_ERROR("Exception while closing session dispatcher: ", e2.what()); } return false; @@ -491,20 +433,13 @@ void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res) // 获取会话ID auto it = req.params.find("session_id"); std::string session_id = it != req.params.end() ? it->second : ""; - - std::cerr << "收到JSON-RPC请求: 会话ID=" << session_id << ", 路径=" << req.path << std::endl; - std::cerr << "请求参数: "; - for (const auto& [key, value] : req.params) { - std::cerr << key << "=" << value << " "; - } - std::cerr << std::endl; // 解析请求 json req_json; try { req_json = json::parse(req.body); } catch (const json::exception& e) { - std::cerr << "解析JSON请求失败: " << e.what() << std::endl; + LOG_ERROR("Failed to parse JSON request: ", e.what()); res.status = 400; res.set_content("{\"error\":\"Invalid JSON\"}", "application/json"); return; @@ -522,7 +457,7 @@ void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res) res.set_content("Accepted", "text/plain"); return; } - std::cerr << "会话不存在: " << session_id << std::endl; + LOG_ERROR("Session not found: ", session_id); res.status = 404; res.set_content("{\"error\":\"Session not found\"}", "application/json"); return; @@ -542,7 +477,7 @@ void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res) mcp_req.params = req_json["params"]; } } catch (const std::exception& e) { - std::cerr << "创建请求对象失败: " << e.what() << std::endl; + LOG_ERROR("Failed to create request object: ", e.what()); res.status = 400; res.set_content("{\"error\":\"Invalid request format\"}", "application/json"); return; @@ -572,9 +507,7 @@ void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res) bool result = dispatcher->send_event(ss.str()); if (!result) { - std::cerr << "通过SSE发送响应失败: 会话ID=" << session_id << std::endl; - } else { - std::cerr << "成功通过SSE发送响应: 会话ID=" << session_id << std::endl; + LOG_ERROR("Failed to send response via SSE: session_id=", session_id); } }); @@ -586,19 +519,15 @@ void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res) json server::process_request(const request& req, const std::string& session_id) { // 检查是否是通知 if (req.is_notification()) { - std::cerr << "处理通知: " << req.method << std::endl; - // 通知没有响应 if (req.method == "notifications/initialized") { - std::cerr << "收到客户端initialized通知,会话: " << session_id << std::endl; set_session_initialized(session_id, true); - std::cerr << "会话已设置为初始化状态: " << session_id << std::endl; } return json::object(); } // 处理方法调用 try { - LOG_INFO("处理方法调用: ", req.method); + LOG_INFO("Processing method call: ", req.method); // 特殊情况:初始化 if (req.method == "initialize") { @@ -608,7 +537,7 @@ json server::process_request(const request& req, const std::string& session_id) } if (!is_session_initialized(session_id)) { - LOG_WARNING("会话未初始化: ", session_id); + LOG_WARNING("Session not initialized: ", session_id); return response::create_error( req.id, error_code::invalid_request, @@ -628,19 +557,19 @@ json server::process_request(const request& req, const std::string& session_id) if (handler) { // 调用处理器 - LOG_INFO("调用方法处理器: ", req.method); + LOG_INFO("Calling method handler: ", req.method); auto future = thread_pool_.enqueue([handler, params = req.params]() -> json { return handler(params); }); json result = future.get(); // 创建成功响应 - LOG_INFO("方法调用成功: ", req.method); + LOG_INFO("Method call successful: ", req.method); return response::create_success(req.id, result).to_json(); } // 方法未找到 - LOG_WARNING("方法未找到: ", req.method); + LOG_WARNING("Method not found: ", req.method); return response::create_error( req.id, error_code::method_not_found, @@ -648,7 +577,7 @@ json server::process_request(const request& req, const std::string& session_id) ).to_json(); } catch (const mcp_exception& e) { // MCP异常 - LOG_ERROR("MCP异常: ", e.what(), ", 代码: ", static_cast(e.code())); + LOG_ERROR("MCP exception: ", e.what(), ", code: ", static_cast(e.code())); return response::create_error( req.id, e.code(), @@ -656,7 +585,7 @@ json server::process_request(const request& req, const std::string& session_id) ).to_json(); } catch (const std::exception& e) { // 其他异常 - LOG_ERROR("处理请求时发生异常: ", e.what()); + LOG_ERROR("Exception while processing request: ", e.what()); return response::create_error( req.id, error_code::internal_error, @@ -664,7 +593,7 @@ json server::process_request(const request& req, const std::string& session_id) ).to_json(); } catch (...) { // 未知异常 - LOG_ERROR("处理请求时发生未知异常"); + LOG_ERROR("Unknown exception while processing request"); return response::create_error( req.id, error_code::internal_error, @@ -676,11 +605,9 @@ json server::process_request(const request& req, const std::string& session_id) json server::handle_initialize(const request& req, const std::string& session_id) { const json& params = req.params; - std::cerr << "处理initialize请求,会话ID: " << session_id << std::endl; - // Version negotiation if (!params.contains("protocolVersion") || !params["protocolVersion"].is_string()) { - std::cerr << "缺少protocolVersion参数或格式不正确" << std::endl; + LOG_ERROR("Missing or invalid protocolVersion parameter"); return response::create_error( req.id, error_code::invalid_params, @@ -689,10 +616,10 @@ json server::handle_initialize(const request& req, const std::string& session_id } std::string requested_version = params["protocolVersion"].get(); - std::cerr << "客户端请求的协议版本: " << requested_version << std::endl; + LOG_INFO("Client requested protocol version: ", requested_version); if (requested_version != MCP_VERSION) { - std::cerr << "不支持的协议版本: " << requested_version << ", 服务器支持: " << MCP_VERSION << std::endl; + LOG_ERROR("Unsupported protocol version: ", requested_version, ", server supports: ", MCP_VERSION); return response::create_error( req.id, error_code::invalid_params, @@ -718,7 +645,7 @@ json server::handle_initialize(const request& req, const std::string& session_id } // Log connection - std::cerr << "客户端连接: " << client_name << " " << client_version << std::endl; + LOG_INFO("Client connected: ", client_name, " ", client_version); // Return server info and capabilities json server_info = { @@ -732,7 +659,7 @@ json server::handle_initialize(const request& req, const std::string& session_id {"serverInfo", server_info} }; - std::cerr << "初始化成功,等待客户端发送notifications/initialized通知" << std::endl; + LOG_INFO("Initialization successful, waiting for notifications/initialized notification"); return response::create_success(req.id, result).to_json(); } @@ -743,9 +670,7 @@ void server::send_request(const std::string& session_id, const std::string& meth // Check if client is initialized or if this is an allowed method if (!is_allowed_before_init && !is_session_initialized(session_id)) { - // Client not initialized and method is not allowed before initialization - std::cerr << "Cannot send " << method << " request to session " << session_id - << " before it is initialized" << std::endl; + LOG_WARNING("Cannot send ", method, " request to session ", session_id, " before it is initialized"); return; } @@ -758,38 +683,33 @@ void server::send_request(const std::string& session_id, const std::string& meth std::lock_guard lock(mutex_); auto it = session_dispatchers_.find(session_id); if (it == session_dispatchers_.end()) { - std::cerr << "会话不存在: " << session_id << std::endl; + LOG_ERROR("Session not found: ", session_id); return; } dispatcher = it->second; } - // 发送请求 - 使用message事件类型,符合MCP规范 + // 发送请求 std::stringstream ss; ss << "event: message\ndata: " << req.to_json().dump() << "\n\n"; bool result = dispatcher->send_event(ss.str()); if (!result) { - std::cerr << "向会话发送请求失败: " << session_id << std::endl; - } else { - std::cerr << "成功向会话 " << session_id << " 发送请求: " << method << std::endl; + LOG_ERROR("Failed to send request to session: ", session_id); } } -// Check if a session is initialized bool server::is_session_initialized(const std::string& session_id) const { std::lock_guard lock(mutex_); auto it = session_initialized_.find(session_id); return (it != session_initialized_.end() && it->second); } -// Set session initialization status void server::set_session_initialized(const std::string& session_id, bool initialized) { std::lock_guard lock(mutex_); session_initialized_[session_id] = initialized; } -// Generate a random session ID in UUID format std::string server::generate_session_id() const { std::random_device rd; std::mt19937 gen(rd()); @@ -826,48 +746,4 @@ std::string server::generate_session_id() const { return ss.str(); } -void server::print_status() const { - std::lock_guard lock(mutex_); - - std::cerr << "=== MCP服务器状态 ===" << std::endl; - std::cerr << "服务器地址: " << host_ << ":" << port_ << std::endl; - std::cerr << "运行状态: " << (running_ ? "运行中" : "已停止") << std::endl; - std::cerr << "SSE端点: " << sse_endpoint_ << std::endl; - std::cerr << "消息端点前缀: " << msg_endpoint_ << std::endl; - - std::cerr << "活跃会话数: " << session_dispatchers_.size() << std::endl; - for (const auto& [session_id, dispatcher] : session_dispatchers_) { - std::cerr << " - 会话ID: " << session_id << ", 状态: " << (dispatcher->is_closed() ? "已关闭" : "活跃") << std::endl; - } - - std::cerr << "SSE线程数: " << sse_threads_.size() << std::endl; - - std::cerr << "注册的方法数: " << method_handlers_.size() << std::endl; - for (const auto& [method, _] : method_handlers_) { - std::cerr << " - " << method << std::endl; - } - - std::cerr << "注册的通知数: " << notification_handlers_.size() << std::endl; - for (const auto& [method, _] : notification_handlers_) { - std::cerr << " - " << method << std::endl; - } - - std::cerr << "注册的资源数: " << resources_.size() << std::endl; - for (const auto& [path, _] : resources_) { - std::cerr << " - " << path << std::endl; - } - - std::cerr << "注册的工具数: " << tools_.size() << std::endl; - for (const auto& [name, _] : tools_) { - std::cerr << " - " << name << std::endl; - } - - std::cerr << "已初始化的会话数: " << session_initialized_.size() << std::endl; - for (const auto& [session_id, initialized] : session_initialized_) { - std::cerr << " - " << session_id << ": " << (initialized ? "已初始化" : "未初始化") << std::endl; - } - - std::cerr << "======================" << std::endl; -} - } // namespace mcp \ No newline at end of file diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 017f2ae..a43f7a4 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -29,6 +29,9 @@ set(TEST_SOURCES test_mcp_client.cpp test_mcp_server.cpp test_mcp_direct_requests.cpp + test_mcp_lifecycle_transport.cpp + test_mcp_versioning.cpp + test_mcp_tools_extended.cpp ) # Create test executable @@ -56,4 +59,10 @@ enable_testing() add_test( NAME ${TEST_PROJECT_NAME} COMMAND ${TEST_PROJECT_NAME} +) + +# Add custom target to run tests +add_custom_target(run_tests + COMMAND ${CMAKE_CTEST_COMMAND} --verbose + DEPENDS ${TEST_PROJECT_NAME} ) \ No newline at end of file diff --git a/test/test_mcp_lifecycle_transport.cpp b/test/test_mcp_lifecycle_transport.cpp new file mode 100644 index 0000000..e7d812e --- /dev/null +++ b/test/test_mcp_lifecycle_transport.cpp @@ -0,0 +1,184 @@ +/** + * @file test_mcp_lifecycle_transport.cpp + * @brief 测试MCP消息生命周期和传输相关功能 + * + * 本文件包含对MCP消息生命周期和SSE传输的单元测试,基于规范2024-11-05。 + */ + +#include "mcp_message.h" +#include "mcp_server.h" +#include "mcp_client.h" +#include +#include +#include +#include +#include +#include + +// 测试类,用于设置服务器和客户端 +class McpLifecycleTransportTest : public ::testing::Test { +protected: + void SetUp() override { + // 创建服务器 + server = std::make_unique("localhost", 8096); + server->set_server_info("TestServer", "2024-11-05"); + + // 设置服务器能力 + mcp::json capabilities = { + {"tools", {{"listChanged", true}}}, + {"transport", {{"sse", true}}} + }; + server->set_capabilities(capabilities); + + // 注册一个简单的方法 + server->register_method("test_method", [](const mcp::json& params) -> mcp::json { + return {{"result", "success"}, {"params_received", params}}; + }); + + // 注册一个通知处理器 + server->register_notification("test_notification", [this](const mcp::json& params) { + notification_received = true; + notification_params = params; + }); + } + + void TearDown() override { + // 停止服务器 + if (server && server_thread.joinable()) { + server->stop(); + server_thread.join(); + } + } + + // 启动服务器 + void start_server() { + server_thread = std::thread([this]() { + server->start(false); + }); + // 等待服务器启动 + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + std::unique_ptr server; + std::thread server_thread; + bool notification_received = false; + mcp::json notification_params; +}; + +// 测试消息生命周期 - 初始化 +TEST_F(McpLifecycleTransportTest, InitializationTest) { + // 启动服务器 + start_server(); + + // 创建客户端 + mcp::client client("localhost", 8096); + client.set_timeout(5); + + // 测试初始化 + bool init_result = client.initialize("TestClient", "1.0.0"); + EXPECT_TRUE(init_result); + + // 获取服务器能力 + mcp::json server_capabilities = client.get_server_capabilities(); + EXPECT_TRUE(server_capabilities.contains("tools")); + EXPECT_TRUE(server_capabilities.contains("transport")); + EXPECT_TRUE(server_capabilities["transport"]["sse"].get()); +} + +// 测试消息生命周期 - 请求和响应 +TEST_F(McpLifecycleTransportTest, RequestResponseTest) { + // 启动服务器 + start_server(); + + // 创建客户端 + mcp::client client("localhost", 8096); + client.set_timeout(5); + client.initialize("TestClient", "1.0.0"); + + // 发送请求并获取响应 + mcp::json params = {{"key", "value"}, {"number", 42}}; + mcp::response response = client.send_request("test_method", params); + + // 验证响应 + EXPECT_FALSE(response.is_error()); + EXPECT_EQ(response.result["result"], "success"); + EXPECT_EQ(response.result["params_received"]["key"], "value"); + EXPECT_EQ(response.result["params_received"]["number"], 42); +} + +// 测试消息生命周期 - 通知 +TEST_F(McpLifecycleTransportTest, NotificationTest) { + // 启动服务器 + start_server(); + + // 创建客户端 + mcp::client client("localhost", 8096); + client.set_timeout(5); + client.initialize("TestClient", "1.0.0"); + + // 发送通知 + mcp::json params = {{"event", "update"}, {"status", "completed"}}; + client.send_notification("test_notification", params); + + // 等待通知处理 + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // 验证通知已接收 + EXPECT_TRUE(notification_received); + EXPECT_EQ(notification_params["event"], "update"); + EXPECT_EQ(notification_params["status"], "completed"); +} + +// 测试SSE传输 - 使用ping方法测试SSE连接 +TEST_F(McpLifecycleTransportTest, SseTransportTest) { + // 启动服务器 + start_server(); + + // 注册一个特殊的方法,用于测试SSE连接 + server->register_method("sse_test", [](const mcp::json& params) -> mcp::json { + return {{"sse_test_result", true}}; + }); + + // 创建客户端 + mcp::client client("localhost", 8096); + client.set_timeout(5); + client.initialize("TestClient", "1.0.0"); + + // 等待SSE连接建立 + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + // 测试SSE连接是否正常工作 - 使用ping方法 + bool ping_result = client.ping(); + EXPECT_TRUE(ping_result); + + // 发送请求并获取响应,验证SSE连接正常工作 + mcp::response response = client.send_request("sse_test"); + EXPECT_FALSE(response.is_error()); + EXPECT_TRUE(response.result["sse_test_result"].get()); +} + +// 测试ping功能 +TEST_F(McpLifecycleTransportTest, PingTest) { + // 启动服务器 + start_server(); + + // 创建客户端 + mcp::client client("localhost", 8096); + client.set_timeout(5); + client.initialize("TestClient", "1.0.0"); + + // 测试ping + bool ping_result = client.ping(); + EXPECT_TRUE(ping_result); + + // 停止服务器 + server->stop(); + server_thread.join(); + + // 等待服务器完全停止 + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + // 再次测试ping,应该失败 + ping_result = client.ping(); + EXPECT_FALSE(ping_result); +} \ No newline at end of file diff --git a/test/test_mcp_tool.cpp b/test/test_mcp_tool.cpp index 5913a12..9a89d0f 100644 --- a/test/test_mcp_tool.cpp +++ b/test/test_mcp_tool.cpp @@ -39,11 +39,11 @@ TEST(McpToolTest, ToolStructTest) { mcp::json json_tool = tool.to_json(); EXPECT_EQ(json_tool["name"], "test_tool"); EXPECT_EQ(json_tool["description"], "测试工具"); - EXPECT_EQ(json_tool["parameters"]["type"], "object"); - EXPECT_EQ(json_tool["parameters"]["properties"]["param1"]["type"], "string"); - EXPECT_EQ(json_tool["parameters"]["properties"]["param1"]["description"], "第一个参数"); - EXPECT_EQ(json_tool["parameters"]["properties"]["param2"]["type"], "number"); - EXPECT_EQ(json_tool["parameters"]["required"][0], "param1"); + EXPECT_EQ(json_tool["inputSchema"]["type"], "object"); + EXPECT_EQ(json_tool["inputSchema"]["properties"]["param1"]["type"], "string"); + EXPECT_EQ(json_tool["inputSchema"]["properties"]["param1"]["description"], "第一个参数"); + EXPECT_EQ(json_tool["inputSchema"]["properties"]["param2"]["type"], "number"); + EXPECT_EQ(json_tool["inputSchema"]["required"][0], "param1"); } // 测试工具构建器 diff --git a/test/test_mcp_tools_extended.cpp b/test/test_mcp_tools_extended.cpp new file mode 100644 index 0000000..f4eeff5 --- /dev/null +++ b/test/test_mcp_tools_extended.cpp @@ -0,0 +1,357 @@ +/** + * @file test_mcp_tools_extended.cpp + * @brief 测试MCP工具相关功能的扩展测试 + * + * 本文件包含对MCP工具模块的扩展单元测试,基于规范2024-11-05。 + */ + +#include "mcp_tool.h" +#include "mcp_server.h" +#include "mcp_client.h" +#include +#include +#include +#include +#include +#include + +// 测试类,用于设置服务器和客户端 +class McpToolsExtendedTest : public ::testing::Test { +protected: + void SetUp() override { + // 创建服务器 + server = std::make_unique("localhost", 8098); + server->set_server_info("TestServer", "2024-11-05"); + + // 设置服务器能力 + mcp::json capabilities = { + {"tools", {{"listChanged", true}}} + }; + server->set_capabilities(capabilities); + + // 注册计算器工具 + mcp::tool calculator = mcp::tool_builder("calculator") + .with_description("计算器工具") + .with_string_param("operation", "操作类型 (add, subtract, multiply, divide)") + .with_number_param("a", "第一个操作数") + .with_number_param("b", "第二个操作数") + .build(); + + server->register_tool(calculator, [](const mcp::json& params) -> mcp::json { + std::string operation = params["operation"]; + double a = params["a"]; + double b = params["b"]; + double result = 0; + + if (operation == "add") { + result = a + b; + } else if (operation == "subtract") { + result = a - b; + } else if (operation == "multiply") { + result = a * b; + } else if (operation == "divide") { + if (b == 0) { + return {{"error", "除数不能为零"}}; + } + result = a / b; + } else { + return {{"error", "未知操作: " + operation}}; + } + + return {{"result", result}}; + }); + + // 注册文本处理工具 + mcp::tool text_processor = mcp::tool_builder("text_processor") + .with_description("文本处理工具") + .with_string_param("text", "要处理的文本") + .with_string_param("operation", "操作类型 (uppercase, lowercase, reverse)") + .build(); + + server->register_tool(text_processor, [](const mcp::json& params) -> mcp::json { + std::string text = params["text"]; + std::string operation = params["operation"]; + std::string result; + + if (operation == "uppercase") { + result = text; + std::transform(result.begin(), result.end(), result.begin(), ::toupper); + } else if (operation == "lowercase") { + result = text; + std::transform(result.begin(), result.end(), result.begin(), ::tolower); + } else if (operation == "reverse") { + result = text; + std::reverse(result.begin(), result.end()); + } else { + return {{"error", "未知操作: " + operation}}; + } + + return {{"result", result}}; + }); + + // 注册列表处理工具 + mcp::tool list_processor = mcp::tool_builder("list_processor") + .with_description("列表处理工具") + .with_array_param("items", "要处理的项目列表", "string") + .with_string_param("operation", "操作类型 (sort, reverse, count)") + .build(); + + server->register_tool(list_processor, [](const mcp::json& params) -> mcp::json { + auto items = params["items"].get>(); + std::string operation = params["operation"]; + + if (operation == "sort") { + std::sort(items.begin(), items.end()); + return {{"result", items}}; + } else if (operation == "reverse") { + std::reverse(items.begin(), items.end()); + return {{"result", items}}; + } else if (operation == "count") { + return {{"result", items.size()}}; + } else { + return {{"error", "未知操作: " + operation}}; + } + }); + } + + void TearDown() override { + // 停止服务器 + if (server && server_thread.joinable()) { + server->stop(); + server_thread.join(); + } + } + + // 启动服务器 + void start_server() { + server_thread = std::thread([this]() { + server->start(false); + }); + // 等待服务器启动 + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + std::unique_ptr server; + std::thread server_thread; +}; + +// 测试获取工具列表 +TEST_F(McpToolsExtendedTest, GetToolsTest) { + // 启动服务器 + start_server(); + + // 创建客户端 + mcp::client client("localhost", 8098); + client.set_timeout(5); + client.initialize("TestClient", mcp::MCP_VERSION); + + // 获取工具列表 + auto tools = client.get_tools(); + + // 验证工具列表 + EXPECT_EQ(tools.size(), 3); + + // 验证工具名称 + std::vector tool_names; + for (const auto& tool : tools) { + tool_names.push_back(tool.name); + } + + EXPECT_THAT(tool_names, ::testing::UnorderedElementsAre("calculator", "text_processor", "list_processor")); +} + +// 测试调用计算器工具 +TEST_F(McpToolsExtendedTest, CalculatorToolTest) { + // 启动服务器 + start_server(); + + // 创建客户端 + mcp::client client("localhost", 8098); + client.set_timeout(5); + client.initialize("TestClient", mcp::MCP_VERSION); + + // 调用加法 + mcp::json add_result = client.call_tool("calculator", { + {"operation", "add"}, + {"a", 5}, + {"b", 3} + }); + EXPECT_EQ(add_result["result"], 8); + + // 调用减法 + mcp::json subtract_result = client.call_tool("calculator", { + {"operation", "subtract"}, + {"a", 10}, + {"b", 4} + }); + EXPECT_EQ(subtract_result["result"], 6); + + // 调用乘法 + mcp::json multiply_result = client.call_tool("calculator", { + {"operation", "multiply"}, + {"a", 6}, + {"b", 7} + }); + EXPECT_EQ(multiply_result["result"], 42); + + // 调用除法 + mcp::json divide_result = client.call_tool("calculator", { + {"operation", "divide"}, + {"a", 20}, + {"b", 5} + }); + EXPECT_EQ(divide_result["result"], 4); + + // 测试除以零 + mcp::json divide_by_zero = client.call_tool("calculator", { + {"operation", "divide"}, + {"a", 10}, + {"b", 0} + }); + EXPECT_TRUE(divide_by_zero.contains("error")); +} + +// 测试调用文本处理工具 +TEST_F(McpToolsExtendedTest, TextProcessorToolTest) { + // 启动服务器 + start_server(); + + // 创建客户端 + mcp::client client("localhost", 8098); + client.set_timeout(5); + client.initialize("TestClient", mcp::MCP_VERSION); + + // 测试转大写 + mcp::json uppercase_result = client.call_tool("text_processor", { + {"text", "Hello World"}, + {"operation", "uppercase"} + }); + EXPECT_EQ(uppercase_result["result"], "HELLO WORLD"); + + // 测试转小写 + mcp::json lowercase_result = client.call_tool("text_processor", { + {"text", "Hello World"}, + {"operation", "lowercase"} + }); + EXPECT_EQ(lowercase_result["result"], "hello world"); + + // 测试反转 + mcp::json reverse_result = client.call_tool("text_processor", { + {"text", "Hello World"}, + {"operation", "reverse"} + }); + EXPECT_EQ(reverse_result["result"], "dlroW olleH"); +} + +// 测试调用列表处理工具 +TEST_F(McpToolsExtendedTest, ListProcessorToolTest) { + // 启动服务器 + start_server(); + + // 创建客户端 + mcp::client client("localhost", 8098); + client.set_timeout(5); + client.initialize("TestClient", mcp::MCP_VERSION); + + // 准备测试数据 + std::vector items = {"banana", "apple", "orange", "grape"}; + + // 测试排序 + mcp::json sort_result = client.call_tool("list_processor", { + {"items", items}, + {"operation", "sort"} + }); + std::vector sorted_items = sort_result["result"]; + EXPECT_THAT(sorted_items, ::testing::ElementsAre("apple", "banana", "grape", "orange")); + + // 测试反转 + mcp::json reverse_result = client.call_tool("list_processor", { + {"items", items}, + {"operation", "reverse"} + }); + std::vector reversed_items = reverse_result["result"]; + EXPECT_THAT(reversed_items, ::testing::ElementsAre("grape", "orange", "apple", "banana")); + + // 测试计数 + mcp::json count_result = client.call_tool("list_processor", { + {"items", items}, + {"operation", "count"} + }); + EXPECT_EQ(count_result["result"], 4); +} + +// 测试工具参数验证 +TEST_F(McpToolsExtendedTest, ToolParameterValidationTest) { + // 启动服务器 + start_server(); + + // 创建客户端 + mcp::client client("localhost", 8098); + client.set_timeout(5); + client.initialize("TestClient", mcp::MCP_VERSION); + + // 测试缺少必需参数 + try { + client.call_tool("calculator", { + {"a", 5} + // 缺少 operation 和 b + }); + FAIL() << "应该抛出异常,因为缺少必需参数"; + } catch (const mcp::mcp_exception& e) { + EXPECT_EQ(e.code(), mcp::error_code::invalid_params); + } + + // 测试参数类型错误 + try { + client.call_tool("calculator", { + {"operation", "add"}, + {"a", "not_a_number"}, // 应该是数字 + {"b", 3} + }); + FAIL() << "应该抛出异常,因为参数类型错误"; + } catch (const mcp::mcp_exception& e) { + EXPECT_EQ(e.code(), mcp::error_code::invalid_params); + } +} + +// 测试工具注册和注销 +TEST_F(McpToolsExtendedTest, ToolRegistrationAndUnregistrationTest) { + // 创建工具注册表 + mcp::tool_registry& registry = mcp::tool_registry::instance(); + + // 创建一个测试工具 + mcp::tool test_tool = mcp::tool_builder("test_tool") + .with_description("测试工具") + .with_string_param("input", "输入参数") + .build(); + + // 注册工具 + registry.register_tool(test_tool, [](const mcp::json& params) -> mcp::json { + return {{"output", "处理结果: " + params["input"].get()}}; + }); + + // 验证工具已注册 + auto registered_tool = registry.get_tool("test_tool"); + ASSERT_NE(registered_tool, nullptr); + EXPECT_EQ(registered_tool->first.name, "test_tool"); + EXPECT_EQ(registered_tool->first.description, "测试工具"); + + // 调用工具 + mcp::json result = registry.call_tool("test_tool", {{"input", "测试输入"}}); + EXPECT_EQ(result["output"], "处理结果: 测试输入"); + + // 注销工具 + bool unregistered = registry.unregister_tool("test_tool"); + EXPECT_TRUE(unregistered); + + // 验证工具已注销 + EXPECT_EQ(registry.get_tool("test_tool"), nullptr); + + // 尝试调用已注销的工具 + try { + registry.call_tool("test_tool", {{"input", "测试输入"}}); + FAIL() << "应该抛出异常,因为工具已注销"; + } catch (const mcp::mcp_exception& e) { + EXPECT_EQ(e.code(), mcp::error_code::method_not_found); + } +} \ No newline at end of file diff --git a/test/test_mcp_versioning.cpp b/test/test_mcp_versioning.cpp new file mode 100644 index 0000000..6d18fbd --- /dev/null +++ b/test/test_mcp_versioning.cpp @@ -0,0 +1,140 @@ +/** + * @file test_mcp_versioning.cpp + * @brief 测试MCP版本控制相关功能 + * + * 本文件包含对MCP版本控制的单元测试,基于规范2024-11-05。 + */ + +#include "mcp_message.h" +#include "mcp_server.h" +#include "mcp_client.h" +#include +#include +#include +#include +#include +#include + +// 测试类,用于设置服务器和客户端 +class McpVersioningTest : public ::testing::Test { +protected: + void SetUp() override { + // 创建服务器 + server = std::make_unique("localhost", 8097); + server->set_server_info("TestServer", "2024-11-05"); + + // 设置服务器能力 + mcp::json capabilities = { + {"tools", {{"listChanged", true}}}, + {"transport", {{"sse", true}}} + }; + server->set_capabilities(capabilities); + } + + void TearDown() override { + // 停止服务器 + if (server && server_thread.joinable()) { + server->stop(); + server_thread.join(); + } + } + + // 启动服务器 + void start_server() { + server_thread = std::thread([this]() { + server->start(false); + }); + // 等待服务器启动 + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + std::unique_ptr server; + std::thread server_thread; +}; + +// 测试版本常量 +TEST(McpVersioningTest, VersionConstantTest) { + // 验证MCP版本常量 + EXPECT_EQ(std::string(mcp::MCP_VERSION), "2024-11-05"); +} + +// 测试版本匹配 +TEST_F(McpVersioningTest, VersionMatchTest) { + // 启动服务器 + start_server(); + + // 创建客户端,使用匹配的版本 + mcp::client client("localhost", 8097); + client.set_timeout(5); + + // 测试初始化,应该成功 + bool init_result = client.initialize("TestClient", mcp::MCP_VERSION); + EXPECT_TRUE(init_result); +} + +// 测试版本不匹配 +TEST_F(McpVersioningTest, VersionMismatchTest) { + // 启动服务器 + start_server(); + + // 创建客户端,使用不匹配的版本 + mcp::client client("localhost", 8097); + client.set_timeout(5); + + // 测试初始化,应该失败或返回警告 + // 注意:根据实际实现,这可能会成功但有警告,或者完全失败 + bool init_result = client.initialize("TestClient", "2023-01-01"); + + // 如果实现允许版本不匹配,这个测试可能需要调整 + // 这里我们假设实现会拒绝不匹配的版本 + EXPECT_FALSE(init_result); +} + +// 测试服务器版本信息 +TEST_F(McpVersioningTest, ServerVersionInfoTest) { + // 启动服务器 + start_server(); + + // 创建客户端 + mcp::client client("localhost", 8097); + client.set_timeout(5); + client.initialize("TestClient", mcp::MCP_VERSION); + + // 获取服务器信息 + mcp::response response = client.send_request("server/info"); + + // 验证服务器信息 + EXPECT_FALSE(response.is_error()); + EXPECT_EQ(response.result["name"], "TestServer"); + EXPECT_EQ(response.result["version"], "2024-11-05"); + EXPECT_TRUE(response.result.contains("capabilities")); +} + +// 测试客户端版本信息 +TEST_F(McpVersioningTest, ClientVersionInfoTest) { + // 创建一个处理器来捕获初始化请求 + mcp::json captured_init_params; + server->register_method("initialize", [&captured_init_params](const mcp::json& params) -> mcp::json { + captured_init_params = params; + return { + {"name", "TestServer"}, + {"version", "2024-11-05"}, + {"capabilities", { + {"tools", {{"listChanged", true}}}, + {"transport", {{"sse", true}}} + }} + }; + }); + + // 启动服务器 + start_server(); + + // 创建客户端 + mcp::client client("localhost", 8097); + client.set_timeout(5); + client.initialize("TestClient", "1.0.0"); + + // 验证客户端版本信息 + EXPECT_EQ(captured_init_params["name"], "TestClient"); + EXPECT_EQ(captured_init_params["version"], "1.0.0"); +} \ No newline at end of file