diff --git a/include/mcp_server.h b/include/mcp_server.h index 9cbb945..a7b7435 100644 --- a/include/mcp_server.h +++ b/include/mcp_server.h @@ -54,6 +54,7 @@ public: std::unique_lock lk(m_); if (closed_.load(std::memory_order_acquire)) { + LOG_ERROR("Event dispatcher closed"); return false; } @@ -64,10 +65,12 @@ public: }); if (closed_.load(std::memory_order_acquire)) { + LOG_ERROR("Event dispatcher closed"); return false; } if (!result) { + LOG_ERROR("Event dispatcher timeout"); return false; } @@ -83,12 +86,14 @@ public: if (!message_copy.empty()) { if (!sink->write(message_copy.data(), message_copy.size())) { close(); + LOG_ERROR("Event dispatcher write failed"); return false; } } return true; } catch (...) { close(); + LOG_ERROR("Event dispatcher exception"); return false; } } diff --git a/src/mcp_client.cpp b/src/mcp_client.cpp index a82265d..760dd85 100644 --- a/src/mcp_client.cpp +++ b/src/mcp_client.cpp @@ -338,12 +338,9 @@ bool client::parse_sse_data(const char* data, size_t length) { std::string event_type = "message"; auto event_pos = sse_data.find("event: "); if (event_pos != std::string::npos) { - auto event_end = sse_data.find("\n", event_pos); + auto event_end = sse_data.find("\r\n", event_pos); if (event_end != std::string::npos) { event_type = sse_data.substr(event_pos + 7, event_end - (event_pos + 7)); - if (!event_type.empty() && event_type.back() == '\r') { - event_type.pop_back(); - } } } @@ -352,7 +349,7 @@ bool client::parse_sse_data(const char* data, size_t length) { return true; } - auto newline_pos = sse_data.find("\n", data_pos); + auto newline_pos = sse_data.find("\r\n", data_pos); if (newline_pos == std::string::npos) { newline_pos = sse_data.length(); } diff --git a/src/mcp_server.cpp b/src/mcp_server.cpp index 220c914..6a22da2 100644 --- a/src/mcp_server.cpp +++ b/src/mcp_server.cpp @@ -406,7 +406,7 @@ void server::handle_sse(const httplib::Request& req, httplib::Response& res) { // 发送初始会话URI std::this_thread::sleep_for(std::chrono::milliseconds(500)); std::stringstream ss; - ss << "event: endpoint\ndata: " << session_uri << "\n\n"; + ss << "event: endpoint\r\ndata: " << session_uri << "\r\n\r\n"; session_dispatcher->send_event(ss.str()); // 更新活动时间(发送消息后) @@ -415,14 +415,14 @@ void server::handle_sse(const httplib::Request& req, httplib::Response& res) { // 定期发送心跳,检测连接状态 int heartbeat_count = 0; while (running_ && !session_dispatcher->is_closed()) { - std::this_thread::sleep_for(std::chrono::seconds(10)); + std::this_thread::sleep_for(std::chrono::seconds(5) + std::chrono::milliseconds(rand() % 500)); // NOTE: DO NOT set it the same as the timeout of wait_event if (session_dispatcher->is_closed() || !running_) { break; } std::stringstream heartbeat; - heartbeat << "event: heartbeat\ndata: " << heartbeat_count++ << "\n\n"; + heartbeat << "event: heartbeat\r\ndata: " << heartbeat_count++ << "\r\n\r\n"; try { bool sent = session_dispatcher->send_event(heartbeat.str()); @@ -630,7 +630,7 @@ void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res) // 通过SSE发送响应 std::stringstream ss; - ss << "event: message\ndata: " << response_json.dump() << "\n\n"; + ss << "event: message\r\ndata: " << response_json.dump() << "\r\n\r\n"; bool result = dispatcher->send_event(ss.str()); if (!result) { @@ -830,7 +830,7 @@ void server::send_request(const std::string& session_id, const std::string& meth // 发送请求 std::stringstream ss; - ss << "event: message\ndata: " << req.to_json().dump() << "\n\n"; + ss << "event: message\r\ndata: " << req.to_json().dump() << "\r\n\r\n"; bool result = dispatcher->send_event(ss.str()); if (!result) {