From 4f2f474c280e85eeeede7592305588a248ae2d21 Mon Sep 17 00:00:00 2001 From: hkr04 Date: Wed, 12 Mar 2025 01:29:43 +0800 Subject: [PATCH] SSE: pass example; fail on cursor; note in mess --- examples/client_example.cpp | 2 +- examples/server_example.cpp | 4 +- include/mcp_client.h | 12 +- include/mcp_server.h | 90 ++++++-- src/CMakeLists.txt | 4 - src/mcp_client.cpp | 196 ++++++++++++++++- src/mcp_server.cpp | 405 ++++++++++++++++++++++++++---------- 7 files changed, 565 insertions(+), 148 deletions(-) diff --git a/examples/client_example.cpp b/examples/client_example.cpp index ed05462..62fbc77 100644 --- a/examples/client_example.cpp +++ b/examples/client_example.cpp @@ -12,7 +12,7 @@ int main() { // Create a client - mcp::client client("localhost", 8080); + mcp::client client("localhost", 8089); // Set capabilites mcp::json capabilities = { diff --git a/examples/server_example.cpp b/examples/server_example.cpp index a4292ad..f995ab0 100644 --- a/examples/server_example.cpp +++ b/examples/server_example.cpp @@ -122,7 +122,7 @@ int main() { std::filesystem::create_directories("./files"); // Create and configure server - mcp::server server("localhost", 8080); + mcp::server server("localhost", 8089); server.set_server_info("ExampleServer", "2024-11-05"); // Set server capabilities @@ -165,7 +165,7 @@ int main() { // server.register_resource("/api", api_resource); // Start server - std::cout << "Starting MCP server at localhost:8080..." << std::endl; + std::cout << "Starting MCP server at localhost:8089..." << std::endl; std::cout << "Press Ctrl+C to stop the server" << std::endl; server.start(true); // Blocking mode diff --git a/include/mcp_client.h b/include/mcp_client.h index b14a9b0..e4bfac6 100644 --- a/include/mcp_client.h +++ b/include/mcp_client.h @@ -22,6 +22,7 @@ #include #include #include +#include namespace mcp { @@ -43,7 +44,7 @@ public: /** * @brief Constructor - * @param base_url The base URL of the server (e.g., "http://localhost:8080") + * @param base_url The base URL of the server (e.g., "localhost:8080") * @param capabilities The capabilities of the client */ client(const std::string& base_url, const json& capabilities = json::object(), const std::string& sse_endpoint = "/sse"); @@ -165,6 +166,12 @@ public: */ json list_resource_templates(); + /** + * @brief 检查服务器是否可访问 + * @return True if the server is accessible + */ + bool check_server_accessible(); + private: std::string base_url_; std::string host_; @@ -184,6 +191,9 @@ private: // Mutex for thread safety mutable std::mutex mutex_; + + // 条件变量,用于等待消息端点设置 + std::condition_variable endpoint_cv_; // SSE connection std::unique_ptr sse_thread_; diff --git a/include/mcp_server.h b/include/mcp_server.h index 2c15528..7e5e91e 100644 --- a/include/mcp_server.h +++ b/include/mcp_server.h @@ -33,26 +33,77 @@ class event_dispatcher { public: event_dispatcher() = default; - void wait_event(httplib::DataSink* sink) { + bool wait_event(httplib::DataSink* sink, const std::chrono::milliseconds& timeout = std::chrono::milliseconds(30000)) { + if (!sink) { + return false; + } + std::unique_lock lk(m_); + + // 如果连接已关闭,返回false + if (closed_) { + return false; + } + int id = id_; - cv_.wait(lk, [&] { return cid_ == id; }); - sink->write(message_.data(), message_.size()); + + // 使用超时等待 + bool result = cv_.wait_for(lk, timeout, [&] { + return cid_ == id || closed_; + }); + + // 如果连接已关闭或等待超时,返回false + if (closed_) { + return false; + } + + if (!result) { + std::cerr << "等待事件超时" << std::endl; + return false; + } + + // 写入数据 + try { + sink->write(message_.data(), message_.size()); + return true; + } catch (const std::exception& e) { + std::cerr << "写入事件数据失败: " << e.what() << std::endl; + closed_ = true; + return false; + } } void send_event(const std::string& message) { std::lock_guard lk(m_); + + // 如果连接已关闭,抛出异常 + if (closed_) { + throw std::runtime_error("连接已关闭"); + } + cid_ = id_++; message_ = message; cv_.notify_all(); } + + void close() { + std::lock_guard lk(m_); + closed_ = true; + cv_.notify_all(); + } + + bool is_closed() const { + std::lock_guard lk(m_); + return closed_; + } private: - std::mutex m_; + mutable std::mutex m_; std::condition_variable cv_; std::atomic id_{0}; std::atomic cid_{-1}; std::string message_; + bool closed_ = false; }; /** @@ -71,7 +122,7 @@ public: */ server(const std::string& host = "localhost", int port = 8080, const std::string& sse_endpoint = "/sse", - const std::string& msg_endpoint_prefix = "/message/"); + const std::string& msg_endpoint_prefix = "/message"); /** * @brief Destructor @@ -151,14 +202,21 @@ public: /** * @brief Send a request to a client - * @param client_address The address of the client + * @param session_id The session ID of the client * @param method The method to call * @param params The parameters to pass * * This method will only send requests other than ping and logging * after the client has sent the initialized notification. */ - void send_request(const std::string& client_address, const std::string& method, const json& params = json::object()); + void send_request(const std::string& session_id, const std::string& method, const json& params = json::object()); + + /** + * @brief 打印服务器状态 + * + * 打印当前服务器的状态,包括活跃的会话、注册的方法等 + */ + void print_status() const; private: std::string host_; @@ -184,7 +242,7 @@ private: // Server-sent events endpoint std::string sse_endpoint_; - std::string msg_endpoint_prefix_; + std::string msg_endpoint_; // Method handlers std::map> method_handlers_; @@ -207,8 +265,8 @@ private: // Running flag bool running_ = false; - // Map to track client initialization status (client_address -> initialized) - std::map client_initialized_; + // Map to track session initialization status (session_id -> initialized) + std::map session_initialized_; // Handle SSE requests void handle_sse(const httplib::Request& req, httplib::Response& res); @@ -217,16 +275,16 @@ private: void handle_jsonrpc(const httplib::Request& req, httplib::Response& res); // Process a JSON-RPC request - json process_request(const request& req, const std::string& client_address); + json process_request(const request& req, const std::string& session_id); // Handle initialization request - json handle_initialize(const request& req, const std::string& client_address); + json handle_initialize(const request& req); - // Check if a client is initialized - bool is_client_initialized(const std::string& client_address) const; + // Check if a session is initialized + bool is_session_initialized(const std::string& session_id) const; - // Set client initialization status - void set_client_initialized(const std::string& client_address, bool initialized); + // Set session initialization status + void set_session_initialized(const std::string& session_id, bool initialized); // Generate a random session ID std::string generate_session_id() const; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ab7f9da..b860160 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -11,10 +11,6 @@ add_library(${TARGET} STATIC ../include/mcp_server.h mcp_tool.cpp ../include/mcp_tool.h - mcp_sse_server.cpp - ../include/mcp_sse_server.h - mcp_sse_client.cpp - ../include/mcp_sse_client.h ) target_link_libraries(${TARGET} PUBLIC diff --git a/src/mcp_client.cpp b/src/mcp_client.cpp index 06839b3..b5c6d48 100644 --- a/src/mcp_client.cpp +++ b/src/mcp_client.cpp @@ -50,6 +50,14 @@ void client::init_client(const std::string& base_url) { } bool client::initialize(const std::string& client_name, const std::string& client_version) { + std::cerr << "开始初始化MCP客户端..." << std::endl; + + // 检查服务器是否可访问 + if (!check_server_accessible()) { + std::cerr << "服务器不可访问,初始化失败" << std::endl; + return false; + } + // Create initialization request request req = request::create("initialize", { {"protocolVersion", MCP_VERSION}, @@ -61,15 +69,60 @@ bool client::initialize(const std::string& client_name, const std::string& clien }); try { + // 打开SSE连接 + std::cerr << "正在打开SSE连接..." << std::endl; open_sse_connection(); + + // 等待SSE连接建立并获取消息端点 + // 使用条件变量和超时机制 + const auto timeout = std::chrono::milliseconds(5000); // 5秒超时 + + { + std::unique_lock lock(mutex_); + + // 检查初始状态 + if (!msg_endpoint_.empty()) { + std::cerr << "消息端点已经设置: " << msg_endpoint_ << std::endl; + } else { + std::cerr << "等待条件变量..." << std::endl; + } + + bool success = endpoint_cv_.wait_for(lock, timeout, [this]() { + if (!sse_running_) { + std::cerr << "SSE连接已关闭,停止等待" << std::endl; + return true; + } + if (!msg_endpoint_.empty()) { + std::cerr << "消息端点已设置,停止等待" << std::endl; + return true; + } + return false; + }); + + // 检查等待结果 + if (!success) { + std::cerr << "条件变量等待超时" << std::endl; + } + + // 如果SSE连接已关闭或等待超时,抛出异常 + if (!sse_running_) { + throw std::runtime_error("SSE连接已关闭,未能获取消息端点"); + } + + if (msg_endpoint_.empty()) { + throw std::runtime_error("等待SSE连接超时,未能获取消息端点"); + } + + std::cerr << "成功获取消息端点: " << msg_endpoint_ << std::endl; + } - // Send the request + // 发送初始化请求 json result = send_jsonrpc(req); - // Store server capabilities + // 存储服务器能力 server_capabilities_ = result["capabilities"]; - // Send initialized notification + // 发送已初始化通知 request notification = request::create_notification("initialized"); send_jsonrpc(notification); @@ -211,6 +264,24 @@ void client::open_sse_connection() { // 设置SSE连接状态为运行中 sse_running_ = true; + // 清空消息端点 + { + std::lock_guard lock(mutex_); + msg_endpoint_.clear(); + + // 通知等待的线程(虽然消息端点为空,但可以让等待的线程检查sse_running_状态) + endpoint_cv_.notify_all(); + } + + // 打印连接信息(调试用) + std::string connection_info; + if (!base_url_.empty()) { + connection_info = "Base URL: " + base_url_ + ", SSE Endpoint: " + sse_endpoint_; + } else { + connection_info = "Host: " + host_ + ", Port: " + std::to_string(port_) + ", SSE Endpoint: " + sse_endpoint_; + } + std::cerr << "尝试建立SSE连接: " << connection_info << std::endl; + // 创建并启动SSE线程 sse_thread_ = std::make_unique([this]() { int retry_count = 0; @@ -220,10 +291,14 @@ void client::open_sse_connection() { while (sse_running_) { try { // 尝试建立SSE连接 + std::cerr << "SSE线程: 尝试连接到 " << sse_endpoint_ << std::endl; + auto res = http_client_->Get(sse_endpoint_.c_str(), [this](const char *data, size_t data_length) { // 解析SSE数据 + std::cerr << "SSE线程: 收到数据,长度: " << data_length << std::endl; if (!parse_sse_data(data, data_length)) { + std::cerr << "SSE线程: 解析数据失败" << std::endl; return false; // 解析失败,关闭连接 } return sse_running_.load(); // 如果sse_running_为false,关闭连接 @@ -231,11 +306,37 @@ void client::open_sse_connection() { // 检查连接是否成功 if (!res) { - throw std::runtime_error("SSE连接失败: " + std::to_string(static_cast(res.error()))); + std::string error_msg = "SSE连接失败: "; + error_msg += "错误代码: " + std::to_string(static_cast(res.error())); + + // 添加更详细的错误信息 + switch (res.error()) { + case httplib::Error::Connection: + error_msg += " (连接错误,服务器可能未运行或无法访问)"; + break; + case httplib::Error::Read: + error_msg += " (读取错误,服务器可能关闭了连接或响应格式不正确)"; + break; + case httplib::Error::Write: + error_msg += " (写入错误)"; + break; + case httplib::Error::ConnectionTimeout: + error_msg += " (连接超时)"; + break; + case httplib::Error::Canceled: + error_msg += " (请求被取消)"; + break; + default: + error_msg += " (未知错误)"; + break; + } + + throw std::runtime_error(error_msg); } // 连接成功后重置重试计数 retry_count = 0; + std::cerr << "SSE线程: 连接成功" << std::endl; } catch (const std::exception& e) { // 记录错误 std::cerr << "SSE连接错误: " << e.what() << std::endl; @@ -248,9 +349,12 @@ void client::open_sse_connection() { // 指数退避重试 int delay = retry_delay_base * (1 << (retry_count - 1)); // 2^(retry_count-1) * base_delay + std::cerr << "将在 " << delay << " 毫秒后重试 (尝试 " << retry_count << "/" << max_retries << ")" << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(delay)); } } + + std::cerr << "SSE线程: 退出" << std::endl; }); } @@ -284,6 +388,9 @@ bool client::parse_sse_data(const char* data, size_t length) { { std::lock_guard lock(mutex_); msg_endpoint_ = data_content; + + // 通知等待的线程 + endpoint_cv_.notify_all(); } return true; @@ -300,11 +407,28 @@ void client::close_sse_connection() { if (sse_thread_ && sse_thread_->joinable()) { sse_thread_->join(); } + + // 清空消息端点 + { + std::lock_guard lock(mutex_); + msg_endpoint_.clear(); + + // 通知等待的线程(虽然消息端点为空,但可以让等待的线程检查sse_running_状态) + endpoint_cv_.notify_all(); + } } json client::send_jsonrpc(const request& req) { std::lock_guard lock(mutex_); + // 检查消息端点是否已设置 + if (msg_endpoint_.empty()) { + throw mcp_exception(error_code::internal_error, "消息端点未设置,SSE连接可能未建立"); + } + + // 打印请求信息(调试用) + std::cerr << "发送JSON-RPC请求: 方法=" << req.method << ", 端点=" << msg_endpoint_ << std::endl; + // Convert request to JSON json req_json = req.to_json(); std::string req_body = req_json.dump(); @@ -324,20 +448,28 @@ json client::send_jsonrpc(const request& req) { if (!result) { // Error occurred auto err = result.error(); + std::string error_msg; switch (err) { case httplib::Error::Connection: - throw mcp_exception(error_code::server_error_start, "Connection error"); + error_msg = "连接错误,服务器可能未运行或无法访问"; + break; case httplib::Error::Read: - throw mcp_exception(error_code::internal_error, "Read error"); + error_msg = "读取错误,服务器可能关闭了连接或响应格式不正确"; + break; case httplib::Error::Write: - throw mcp_exception(error_code::internal_error, "Write error"); + error_msg = "写入错误"; + break; case httplib::Error::ConnectionTimeout: - throw mcp_exception(error_code::server_error_start, "Timeout error"); + error_msg = "连接超时"; + break; default: - throw mcp_exception(error_code::internal_error, - "HTTP client error: " + std::to_string(static_cast(err))); + error_msg = "HTTP客户端错误: " + std::to_string(static_cast(err)); + break; } + + std::cerr << "JSON-RPC请求失败: " << error_msg << std::endl; + throw mcp_exception(error_code::internal_error, error_msg); } // Check if it's a notification (no response expected) @@ -349,6 +481,9 @@ json client::send_jsonrpc(const request& req) { try { json res_json = json::parse(result->body); + // 打印响应信息(调试用) + std::cerr << "收到JSON-RPC响应: " << res_json.dump() << std::endl; + // Check for error if (res_json.contains("error")) { int code = res_json["error"]["code"]; @@ -369,4 +504,45 @@ json client::send_jsonrpc(const request& req) { } } +bool client::check_server_accessible() { + std::cerr << "检查服务器是否可访问..." << std::endl; + + try { + // 尝试发送一个简单的GET请求到服务器 + auto res = http_client_->Get("/"); + + if (res) { + std::cerr << "服务器可访问,状态码: " << res->status << std::endl; + return true; + } else { + std::string error_msg = "服务器不可访问,错误代码: " + std::to_string(static_cast(res.error())); + + // 添加更详细的错误信息 + switch (res.error()) { + case httplib::Error::Connection: + error_msg += " (连接错误,服务器可能未运行或无法访问)"; + break; + case httplib::Error::Read: + error_msg += " (读取错误)"; + break; + case httplib::Error::Write: + error_msg += " (写入错误)"; + break; + case httplib::Error::ConnectionTimeout: + error_msg += " (连接超时)"; + break; + default: + error_msg += " (未知错误)"; + break; + } + + std::cerr << error_msg << std::endl; + return false; + } + } catch (const std::exception& e) { + std::cerr << "检查服务器可访问性时发生异常: " << e.what() << std::endl; + return false; + } +} + } // namespace mcp \ No newline at end of file diff --git a/src/mcp_server.cpp b/src/mcp_server.cpp index 4475211..ff5e0ab 100644 --- a/src/mcp_server.cpp +++ b/src/mcp_server.cpp @@ -11,7 +11,7 @@ namespace mcp { server::server(const std::string& host, int port, const std::string& sse_endpoint, const std::string& msg_endpoint_prefix) - : host_(host), port_(port), sse_endpoint_(sse_endpoint), msg_endpoint_prefix_(msg_endpoint_prefix), name_("MCP Server"), version_(MCP_VERSION) { + : host_(host), port_(port), sse_endpoint_(sse_endpoint), msg_endpoint_(msg_endpoint_prefix), name_("MCP Server"), version_(MCP_VERSION) { http_server_ = std::make_unique(); @@ -33,55 +33,49 @@ bool server::start(bool blocking) { return true; // Already running } - // Set up JSON-RPC endpoint - http_server_->Post(msg_endpoint_prefix_.c_str(), [this](const httplib::Request& req, httplib::Response& res) { - // 从URL参数中获取session_id - if (auto session_id = req.get_param_value("session_id"); !session_id.empty()) { - // 检查session是否存在 - std::lock_guard lock(mutex_); - if (session_dispatchers_.find(session_id) != session_dispatchers_.end()) { - // session存在,处理JSON-RPC请求 - handle_jsonrpc(req, res); - return; - } - } - - // session不存在,返回错误 - json error_response = { - {"jsonrpc", "2.0"}, - {"error", { - {"code", static_cast(error_code::invalid_request)}, - {"message", "Invalid or missing session ID. Initialize first to get a session ID"} - }}, - {"id", nullptr} - }; - res.set_content(error_response.dump(), "application/json"); + std::cerr << "启动MCP服务器: " << host_ << ":" << port_ << std::endl; + + // 设置CORS处理 + http_server_->Options(".*", [](const httplib::Request& req, httplib::Response& res) { + res.set_header("Access-Control-Allow-Origin", "*"); + res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); + res.set_header("Access-Control-Allow-Headers", "Content-Type"); + res.status = 204; // No Content + }); + + // 设置JSON-RPC端点 + http_server_->Post(msg_endpoint_.c_str(), [this](const httplib::Request& req, httplib::Response& res) { + this->handle_jsonrpc(req, res); }); - // Set up SSE endpoint + // 设置SSE端点 http_server_->Get(sse_endpoint_.c_str(), [this](const httplib::Request& req, httplib::Response& res) { this->handle_sse(req, res); }); - // Start the server + // 启动服务器 if (blocking) { running_ = true; + std::cerr << "以阻塞模式启动服务器" << std::endl; + print_status(); if (!http_server_->listen(host_.c_str(), port_)) { running_ = false; - std::cerr << "Failed to start server on " << host_ << ":" << port_ << std::endl; + std::cerr << "无法在 " << host_ << ":" << port_ << " 上启动服务器" << std::endl; return false; } return true; } else { - // Start in a separate thread + // 在单独的线程中启动 server_thread_ = std::make_unique([this]() { + std::cerr << "在单独的线程中启动服务器" << std::endl; if (!http_server_->listen(host_.c_str(), port_)) { - std::cerr << "Failed to start server on " << host_ << ":" << port_ << std::endl; + std::cerr << "无法在 " << host_ << ":" << port_ << " 上启动服务器" << std::endl; running_ = false; return; } }); running_ = true; + print_status(); return true; } } @@ -91,27 +85,53 @@ void server::stop() { return; } + std::cerr << "正在停止MCP服务器..." << std::endl; + print_status(); + running_ = false; + + // 关闭所有SSE连接 + { + std::lock_guard lock(mutex_); + for (auto& [session_id, dispatcher] : session_dispatchers_) { + try { + std::cerr << "关闭会话: " << session_id << std::endl; + dispatcher->close(); + } catch (const std::exception& e) { + std::cerr << "关闭会话时发生异常: " << session_id << ", " << e.what() << std::endl; + } + } + } + + // 等待所有SSE线程结束 + { + std::lock_guard lock(mutex_); + for (auto it = sse_threads_.begin(); it != sse_threads_.end();) { + auto& [session_id, thread] = *it; + if (thread && thread->joinable()) { + try { + std::cerr << "等待会话线程结束: " << session_id << std::endl; + // 分离线程而不是等待它结束,避免可能的死锁 + thread->detach(); + } catch (const std::exception& e) { + std::cerr << "等待会话线程结束时发生异常: " << session_id << ", " << e.what() << std::endl; + } + } + it = sse_threads_.erase(it); + } + session_dispatchers_.clear(); + } + if (http_server_) { + std::cerr << "停止HTTP服务器..." << std::endl; http_server_->stop(); } if (server_thread_ && server_thread_->joinable()) { + std::cerr << "等待服务器线程结束..." << std::endl; server_thread_->join(); } - // 清理所有SSE线程 - { - std::lock_guard lock(mutex_); - for (auto& [session_id, thread] : sse_threads_) { - if (thread && thread->joinable()) { - thread->join(); - } - } - sse_threads_.clear(); - session_dispatchers_.clear(); - } - - running_ = false; + std::cerr << "MCP服务器已停止" << std::endl; } bool server::is_running() const { @@ -277,83 +297,176 @@ void server::set_auth_handler(std::function handler) { } void server::handle_sse(const httplib::Request& req, httplib::Response& res) { - // 生成会话ID并创建会话URI + // 生成会话ID std::string session_id = generate_session_id(); - std::string session_uri = msg_endpoint_prefix_ + "?session_id=" + session_id; + std::string session_uri = msg_endpoint_ + "?session_id=" + session_id; - // 创建一个共享的事件分发器,确保其生命周期 + std::cerr << "新的SSE连接: 客户端=" << req.remote_addr << ", 会话ID=" << session_id << std::endl; + + // 创建会话特定的事件分发器 auto session_dispatcher = std::make_shared(); + // 添加会话分发器到映射表 { std::lock_guard lock(mutex_); - // 存储会话信息 session_dispatchers_[session_id] = session_dispatcher; } - // 创建并启动会话线程,使用共享指针和值捕获而不是引用捕获 - sse_threads_[session_id] = std::make_unique([this, session_id, session_uri, session_dispatcher]() { + // 创建会话线程,使用值捕获而不是引用捕获 + auto thread = std::make_unique([this, session_id, session_uri, session_dispatcher]() { try { - // 等待一段时间 + std::cerr << "SSE会话线程启动: " << session_id << std::endl; + + // 发送初始会话URI std::this_thread::sleep_for(std::chrono::seconds(1)); - std::stringstream ss; - ss << "data: " << session_uri << "\n\n"; - - // 使用会话特定的分发器发送事件 + ss << "event: endpoint\ndata: " << session_uri << "\n\n"; session_dispatcher->send_event(ss.str()); + std::cerr << "发送会话URI: " << session_uri << " 到会话: " << session_id << std::endl; - // 设置定期心跳 - while (running_) { - std::this_thread::sleep_for(std::chrono::seconds(30)); + // 定期发送心跳,检测连接状态 + int heartbeat_count = 0; + while (running_ && !session_dispatcher->is_closed()) { + std::this_thread::sleep_for(std::chrono::seconds(10)); + + // 检查分发器是否已关闭 + if (session_dispatcher->is_closed()) { + std::cerr << "会话已关闭,停止心跳: " << session_id << std::endl; + break; + } // 发送心跳事件 std::stringstream heartbeat; - heartbeat << "event: heartbeat\ndata: {}\n\n"; - session_dispatcher->send_event(heartbeat.str()); + heartbeat << "event: heartbeat\ndata: " << heartbeat_count++ << "\n\n"; + + try { + session_dispatcher->send_event(heartbeat.str()); + std::cerr << "发送心跳到会话: " << session_id << ", 计数: " << heartbeat_count << std::endl; + } catch (const std::exception& e) { + std::cerr << "发送心跳失败,假定连接已关闭: " << e.what() << std::endl; + break; + } } + + std::cerr << "SSE会话线程退出: " << session_id << std::endl; } catch (const std::exception& e) { - // 记录错误 - std::cerr << "SSE thread error for session " << session_id << ": " << e.what() << std::endl; + std::cerr << "SSE会话线程异常: " << session_id << ", " << e.what() << std::endl; } - // 线程结束时清理资源 + // 清理资源 { std::lock_guard lock(mutex_); session_dispatchers_.erase(session_id); + sse_threads_.erase(session_id); } }); - // 不再使用detach,而是在server析构函数中管理线程生命周期 - - // 设置分块内容提供者 - res.set_chunked_content_provider("text/event-stream", [session_dispatcher](size_t /* offset */, httplib::DataSink& sink) { - // 使用会话特定的分发器等待事件 - session_dispatcher->wait_event(&sink); - return true; - }); - - // 注册会话特定的JSON-RPC端点 + // 存储线程 { std::lock_guard lock(mutex_); - http_server_->Post(session_uri, [this](const httplib::Request& req, httplib::Response& res) { - handle_jsonrpc(req, res); - }); + sse_threads_[session_id] = std::move(thread); } + + // 设置分块内容提供者 + res.set_chunked_content_provider("text/event-stream", [this, session_id, session_dispatcher](size_t /* offset */, httplib::DataSink& sink) { + try { + // 检查会话是否已关闭 + { + std::lock_guard lock(mutex_); + auto it = session_dispatchers_.find(session_id); + if (it == session_dispatchers_.end() || it->second->is_closed()) { + std::cerr << "会话已关闭,停止内容提供: " << session_id << std::endl; + + // 清理资源 + auto thread_it = sse_threads_.find(session_id); + if (thread_it != sse_threads_.end() && thread_it->second && thread_it->second->joinable()) { + thread_it->second->detach(); // 分离线程,让它自行清理 + } + session_dispatchers_.erase(session_id); + sse_threads_.erase(session_id); + + return false; + } + } + + // 等待事件 + bool result = session_dispatcher->wait_event(&sink); + if (!result) { + std::cerr << "等待事件失败,关闭连接: " << session_id << std::endl; + + // 关闭会话 + { + std::lock_guard lock(mutex_); + auto it = session_dispatchers_.find(session_id); + if (it != session_dispatchers_.end()) { + it->second->close(); + } + + // 清理资源 + auto thread_it = sse_threads_.find(session_id); + if (thread_it != sse_threads_.end() && thread_it->second && thread_it->second->joinable()) { + thread_it->second->detach(); // 分离线程,让它自行清理 + } + session_dispatchers_.erase(session_id); + sse_threads_.erase(session_id); + } + + return false; + } + + return true; + } catch (const std::exception& e) { + std::cerr << "SSE内容提供者异常: " << e.what() << std::endl; + return false; + } + }); } void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res) { - // Set response headers + // 设置响应头 res.set_header("Content-Type", "application/json"); + res.set_header("Access-Control-Allow-Origin", "*"); + res.set_header("Access-Control-Allow-Methods", "POST, OPTIONS"); + res.set_header("Access-Control-Allow-Headers", "Content-Type"); - // Get client address - std::string client_address = req.remote_addr; + // 处理OPTIONS请求(CORS预检) + if (req.method == "OPTIONS") { + res.status = 204; // No Content + return; + } - // Parse the request + // 获取会话ID + auto it = req.params.find("session_id"); + std::string session_id = it != req.params.end() ? it->second : ""; + + std::cerr << "收到JSON-RPC请求: 会话ID=" << session_id << ", 路径=" << req.path << std::endl; + + // 检查会话是否存在 + { + std::lock_guard lock(mutex_); + if (!session_id.empty() && session_dispatchers_.find(session_id) == session_dispatchers_.end()) { + std::cerr << "会话不存在: " << session_id << std::endl; + json error_response = { + {"jsonrpc", "2.0"}, + {"error", { + {"code", static_cast(error_code::invalid_request)}, + {"message", "Session not found"} + }}, + {"id", nullptr} + }; + res.set_content(error_response.dump(), "application/json"); + return; + } + } + + // 解析请求 json req_json; try { req_json = json::parse(req.body); + std::cerr << "请求内容: " << req_json.dump() << std::endl; } catch (const json::exception& e) { - // Invalid JSON + // 无效的JSON + std::cerr << "解析JSON失败: " << e.what() << std::endl; json error_response = { {"jsonrpc", "2.0"}, {"error", { @@ -366,9 +479,10 @@ void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res) return; } - // Check if it's a batch request + // 检查是否是批量请求 if (req_json.is_array()) { - // Batch request not supported yet + // 批量请求暂不支持 + std::cerr << "不支持批量请求" << std::endl; json error_response = { {"jsonrpc", "2.0"}, {"error", { @@ -381,7 +495,7 @@ void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res) return; } - // Convert to request object + // 转换为请求对象 request mcp_req; try { mcp_req.jsonrpc = req_json["jsonrpc"]; @@ -395,7 +509,8 @@ void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res) mcp_req.params = req_json["params"]; } } catch (const json::exception& e) { - // Invalid request + // 无效的请求 + std::cerr << "无效的请求: " << e.what() << std::endl; json error_response = { {"jsonrpc", "2.0"}, {"error", { @@ -408,57 +523,76 @@ void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res) return; } - // Process the request - json result = process_request(mcp_req, client_address); + // 处理请求 + std::cerr << "处理方法: " << mcp_req.method << std::endl; + json result = process_request(mcp_req, session_id); + std::cerr << "响应: " << result.dump() << std::endl; res.set_content(result.dump(), "application/json"); } -json server::process_request(const request& req, const std::string& client_address) { - // Check if it's a notification - if (req.is_notification()) { +json server::process_request(const request& req, const std::string& session_id) { + // 检查是否是通知 + if (req.is_notification()) { + std::cerr << "处理通知: " << req.method << std::endl; + // 通知没有响应 if (req.method == "notifications/initialized") { - set_client_initialized(client_address, true); + set_session_initialized(session_id, true); } - // No response for notifications return json::object(); } - // Handle method call + // 处理方法调用 try { - // Special case for initialize + std::cerr << "处理方法调用: " << req.method << std::endl; + + // 特殊情况:初始化 if (req.method == "initialize") { - return handle_initialize(req, client_address); + return handle_initialize(req); } else if (req.method == "ping") { - // The receiver MUST respond promptly with an empty response + // 接收者必须立即响应一个空响应 + std::cerr << "处理ping请求" << std::endl; return response::create_success(req.id, {}).to_json(); } + + if (!is_session_initialized(session_id)) { + return response::create_error( + req.id, + error_code::invalid_request, + "Session not initialized" + ).to_json(); + } - // Look for registered method handler + // 查找注册的方法处理器 std::lock_guard lock(mutex_); auto it = method_handlers_.find(req.method); if (it != method_handlers_.end()) { - // Call the handler + // 调用处理器 + std::cerr << "调用方法处理器: " << req.method << std::endl; json result = it->second(req.params); - // Create success response + // 创建成功响应 + std::cerr << "方法调用成功: " << req.method << std::endl; return response::create_success(req.id, result).to_json(); } - // Method not found + // 方法未找到 + std::cerr << "方法未找到: " << req.method << std::endl; return response::create_error( req.id, error_code::method_not_found, "Method not found: " + req.method ).to_json(); } catch (const mcp_exception& e) { - // MCP exception + // MCP异常 + std::cerr << "MCP异常: " << e.what() << ", 代码: " << static_cast(e.code()) << std::endl; return response::create_error( req.id, e.code(), e.what() ).to_json(); } catch (const std::exception& e) { - // Generic exception + // 通用异常 + std::cerr << "处理请求时发生异常: " << e.what() << std::endl; return response::create_error( req.id, error_code::internal_error, @@ -467,7 +601,7 @@ json server::process_request(const request& req, const std::string& client_addre } } -json server::handle_initialize(const request& req, const std::string& client_address) { +json server::handle_initialize(const request& req) { const json& params = req.params; // Version negotiation @@ -506,9 +640,6 @@ json server::handle_initialize(const request& req, const std::string& client_add } } - // Mark client as not initialized yet - set_client_initialized(client_address, false); - // Log connection // std::cout << "Client connected: " << client_name << " " << client_version << std::endl; @@ -524,17 +655,19 @@ json server::handle_initialize(const request& req, const std::string& client_add {"serverInfo", server_info} }; + // set_session_initialized(session_id, false); + return response::create_success(req.id, result).to_json(); } -void server::send_request(const std::string& client_address, const std::string& method, const json& params) { +void server::send_request(const std::string& session_id, const std::string& method, const json& params) { // Check if the method is ping or logging bool is_allowed_before_init = (method == "ping" || method == "logging"); // Check if client is initialized or if this is an allowed method - if (!is_allowed_before_init && !is_client_initialized(client_address)) { + if (!is_allowed_before_init && !is_session_initialized(session_id)) { // Client not initialized and method is not allowed before initialization - std::cerr << "Cannot send " << method << " request to client " << client_address + std::cerr << "Cannot send " << method << " request to session " << session_id << " before it is initialized" << std::endl; return; } @@ -546,17 +679,17 @@ void server::send_request(const std::string& client_address, const std::string& // This would typically involve sending an HTTP request to the client } -// Check if a client is initialized -bool server::is_client_initialized(const std::string& client_address) const { +// Check if a session is initialized +bool server::is_session_initialized(const std::string& session_id) const { std::lock_guard lock(mutex_); - auto it = client_initialized_.find(client_address); - return (it != client_initialized_.end() && it->second); + auto it = session_initialized_.find(session_id); + return (it != session_initialized_.end() && it->second); } -// Set client initialization status -void server::set_client_initialized(const std::string& client_address, bool initialized) { +// Set session initialization status +void server::set_session_initialized(const std::string& session_id, bool initialized) { std::lock_guard lock(mutex_); - client_initialized_[client_address] = initialized; + session_initialized_[session_id] = initialized; } // Generate a random session ID @@ -575,4 +708,48 @@ std::string server::generate_session_id() const { return ss.str(); } +void server::print_status() const { + std::lock_guard lock(mutex_); + + std::cerr << "=== MCP服务器状态 ===" << std::endl; + std::cerr << "服务器地址: " << host_ << ":" << port_ << std::endl; + std::cerr << "运行状态: " << (running_ ? "运行中" : "已停止") << std::endl; + std::cerr << "SSE端点: " << sse_endpoint_ << std::endl; + std::cerr << "消息端点前缀: " << msg_endpoint_ << std::endl; + + std::cerr << "活跃会话数: " << session_dispatchers_.size() << std::endl; + for (const auto& [session_id, dispatcher] : session_dispatchers_) { + std::cerr << " - 会话ID: " << session_id << ", 状态: " << (dispatcher->is_closed() ? "已关闭" : "活跃") << std::endl; + } + + std::cerr << "SSE线程数: " << sse_threads_.size() << std::endl; + + std::cerr << "注册的方法数: " << method_handlers_.size() << std::endl; + for (const auto& [method, _] : method_handlers_) { + std::cerr << " - " << method << std::endl; + } + + std::cerr << "注册的通知数: " << notification_handlers_.size() << std::endl; + for (const auto& [method, _] : notification_handlers_) { + std::cerr << " - " << method << std::endl; + } + + std::cerr << "注册的资源数: " << resources_.size() << std::endl; + for (const auto& [path, _] : resources_) { + std::cerr << " - " << path << std::endl; + } + + std::cerr << "注册的工具数: " << tools_.size() << std::endl; + for (const auto& [name, _] : tools_) { + std::cerr << " - " << name << std::endl; + } + + std::cerr << "已初始化的会话数: " << session_initialized_.size() << std::endl; + for (const auto& [session_id, initialized] : session_initialized_) { + std::cerr << " - " << session_id << ": " << (initialized ? "已初始化" : "未初始化") << std::endl; + } + + std::cerr << "======================" << std::endl; +} + } // namespace mcp \ No newline at end of file