Compare commits

..

No commits in common. "4f2f474c280e85eeeede7592305588a248ae2d21" and "62eb1ad2e4899099e0f74ce27f50519c6bae2f73" have entirely different histories.

6 changed files with 88 additions and 781 deletions

View File

@ -12,7 +12,7 @@
int main() {
// Create a client
mcp::client client("localhost", 8089);
mcp::client client("localhost", 8080);
// Set capabilites
mcp::json capabilities = {

View File

@ -122,7 +122,7 @@ int main() {
std::filesystem::create_directories("./files");
// Create and configure server
mcp::server server("localhost", 8089);
mcp::server server("localhost", 8080);
server.set_server_info("ExampleServer", "2024-11-05");
// Set server capabilities
@ -165,7 +165,7 @@ int main() {
// server.register_resource("/api", api_resource);
// Start server
std::cout << "Starting MCP server at localhost:8089..." << std::endl;
std::cout << "Starting MCP server at localhost:8080..." << std::endl;
std::cout << "Press Ctrl+C to stop the server" << std::endl;
server.start(true); // Blocking mode

View File

@ -21,8 +21,6 @@
#include <memory>
#include <mutex>
#include <functional>
#include <atomic>
#include <condition_variable>
namespace mcp {
@ -40,14 +38,14 @@ public:
* @param host The server host (e.g., "localhost", "example.com")
* @param port The server port
*/
client(const std::string& host, int port = 8080, const json& capabilities = json::object(), const std::string& sse_endpoint = "/sse");
client(const std::string& host, int port = 8080, const json& capabilities = json::object());
/**
* @brief Constructor
* @param base_url The base URL of the server (e.g., "localhost:8080")
* @param base_url The base URL of the server (e.g., "http://localhost:8080")
* @param capabilities The capabilities of the client
*/
client(const std::string& base_url, const json& capabilities = json::object(), const std::string& sse_endpoint = "/sse");
client(const std::string& base_url, const json& capabilities = json::object());
/**
* @brief Destructor
@ -166,18 +164,10 @@ public:
*/
json list_resource_templates();
/**
* @brief 访
* @return True if the server is accessible
*/
bool check_server_accessible();
private:
std::string base_url_;
std::string host_;
int port_;
std::string sse_endpoint_;
std::string msg_endpoint_;
std::string auth_token_;
int timeout_seconds_ = 30;
json capabilities_;
@ -192,21 +182,9 @@ private:
// Mutex for thread safety
mutable std::mutex mutex_;
// 条件变量,用于等待消息端点设置
std::condition_variable endpoint_cv_;
// SSE connection
std::unique_ptr<std::thread> sse_thread_;
// SSE连接状态
std::atomic<bool> sse_running_{false};
// Initialize the client
void init_client(const std::string& host, int port);
void init_client(const std::string& base_url);
void open_sse_connection();
void close_sse_connection();
bool parse_sse_data(const char* data, size_t length);
// Send a JSON-RPC request and get the response
json send_jsonrpc(const request& req);

View File

@ -23,89 +23,9 @@
#include <mutex>
#include <thread>
#include <functional>
#include <chrono>
#include <condition_variable>
namespace mcp {
class event_dispatcher {
public:
event_dispatcher() = default;
bool wait_event(httplib::DataSink* sink, const std::chrono::milliseconds& timeout = std::chrono::milliseconds(30000)) {
if (!sink) {
return false;
}
std::unique_lock<std::mutex> lk(m_);
// 如果连接已关闭返回false
if (closed_) {
return false;
}
int id = id_;
// 使用超时等待
bool result = cv_.wait_for(lk, timeout, [&] {
return cid_ == id || closed_;
});
// 如果连接已关闭或等待超时返回false
if (closed_) {
return false;
}
if (!result) {
std::cerr << "等待事件超时" << std::endl;
return false;
}
// 写入数据
try {
sink->write(message_.data(), message_.size());
return true;
} catch (const std::exception& e) {
std::cerr << "写入事件数据失败: " << e.what() << std::endl;
closed_ = true;
return false;
}
}
void send_event(const std::string& message) {
std::lock_guard<std::mutex> lk(m_);
// 如果连接已关闭,抛出异常
if (closed_) {
throw std::runtime_error("连接已关闭");
}
cid_ = id_++;
message_ = message;
cv_.notify_all();
}
void close() {
std::lock_guard<std::mutex> lk(m_);
closed_ = true;
cv_.notify_all();
}
bool is_closed() const {
std::lock_guard<std::mutex> lk(m_);
return closed_;
}
private:
mutable std::mutex m_;
std::condition_variable cv_;
std::atomic<int> id_{0};
std::atomic<int> cid_{-1};
std::string message_;
bool closed_ = false;
};
/**
* @class server
* @brief Main MCP server class
@ -120,9 +40,7 @@ public:
* @param host The host to bind to (e.g., "localhost", "0.0.0.0")
* @param port The port to listen on
*/
server(const std::string& host = "localhost", int port = 8080,
const std::string& sse_endpoint = "/sse",
const std::string& msg_endpoint_prefix = "/message");
server(const std::string& host = "localhost", int port = 8080);
/**
* @brief Destructor
@ -202,21 +120,14 @@ public:
/**
* @brief Send a request to a client
* @param session_id The session ID of the client
* @param client_address The address of the client
* @param method The method to call
* @param params The parameters to pass
*
* This method will only send requests other than ping and logging
* after the client has sent the initialized notification.
*/
void send_request(const std::string& session_id, const std::string& method, const json& params = json::object());
/**
* @brief
*
*
*/
void print_status() const;
void send_request(const std::string& client_address, const std::string& method, const json& params = json::object());
private:
std::string host_;
@ -230,19 +141,6 @@ private:
// Server thread (for non-blocking mode)
std::unique_ptr<std::thread> server_thread_;
// SSE thread
std::map<std::string, std::unique_ptr<std::thread>> sse_threads_;
// Event dispatcher for server-sent events
event_dispatcher sse_dispatcher_;
// Session-specific event dispatchers
std::map<std::string, std::shared_ptr<event_dispatcher>> session_dispatchers_;
// Server-sent events endpoint
std::string sse_endpoint_;
std::string msg_endpoint_;
// Method handlers
std::map<std::string, std::function<json(const json&)>> method_handlers_;
@ -265,29 +163,23 @@ private:
// Running flag
bool running_ = false;
// Map to track session initialization status (session_id -> initialized)
std::map<std::string, bool> session_initialized_;
// Handle SSE requests
void handle_sse(const httplib::Request& req, httplib::Response& res);
// Map to track client initialization status (client_address -> initialized)
std::map<std::string, bool> client_initialized_;
// Handle incoming JSON-RPC requests
void handle_jsonrpc(const httplib::Request& req, httplib::Response& res);
// Process a JSON-RPC request
json process_request(const request& req, const std::string& session_id);
json process_request(const request& req, const std::string& client_address);
// Handle initialization request
json handle_initialize(const request& req);
json handle_initialize(const request& req, const std::string& client_address);
// Check if a session is initialized
bool is_session_initialized(const std::string& session_id) const;
// Check if a client is initialized
bool is_client_initialized(const std::string& client_address) const;
// Set session initialization status
void set_session_initialized(const std::string& session_id, bool initialized);
// Generate a random session ID
std::string generate_session_id() const;
// Set client initialization status
void set_client_initialized(const std::string& client_address, bool initialized);
};

View File

@ -11,22 +11,19 @@
namespace mcp {
client::client(const std::string& host, int port, const json& capabilities, const std::string& sse_endpoint)
: host_(host), port_(port), capabilities_(capabilities), sse_endpoint_(sse_endpoint) {
client::client(const std::string& host, int port, const json& capabilities)
: host_(host), port_(port), capabilities_(capabilities) {
init_client(host, port);
}
client::client(const std::string& base_url, const json& capabilities, const std::string& sse_endpoint)
: base_url_(base_url), capabilities_(capabilities), sse_endpoint_(sse_endpoint) {
client::client(const std::string& base_url, const json& capabilities)
: base_url_(base_url), capabilities_(capabilities) {
init_client(base_url);
}
client::~client() {
// 关闭SSE连接
close_sse_connection();
// httplib::Client将自动销毁
// httplib::Client will be automatically destroyed
}
void client::init_client(const std::string& host, int port) {
@ -50,14 +47,6 @@ void client::init_client(const std::string& base_url) {
}
bool client::initialize(const std::string& client_name, const std::string& client_version) {
std::cerr << "开始初始化MCP客户端..." << std::endl;
// 检查服务器是否可访问
if (!check_server_accessible()) {
std::cerr << "服务器不可访问,初始化失败" << std::endl;
return false;
}
// Create initialization request
request req = request::create("initialize", {
{"protocolVersion", MCP_VERSION},
@ -69,68 +58,19 @@ bool client::initialize(const std::string& client_name, const std::string& clien
});
try {
// 打开SSE连接
std::cerr << "正在打开SSE连接..." << std::endl;
open_sse_connection();
// 等待SSE连接建立并获取消息端点
// 使用条件变量和超时机制
const auto timeout = std::chrono::milliseconds(5000); // 5秒超时
{
std::unique_lock<std::mutex> lock(mutex_);
// 检查初始状态
if (!msg_endpoint_.empty()) {
std::cerr << "消息端点已经设置: " << msg_endpoint_ << std::endl;
} else {
std::cerr << "等待条件变量..." << std::endl;
}
bool success = endpoint_cv_.wait_for(lock, timeout, [this]() {
if (!sse_running_) {
std::cerr << "SSE连接已关闭停止等待" << std::endl;
return true;
}
if (!msg_endpoint_.empty()) {
std::cerr << "消息端点已设置,停止等待" << std::endl;
return true;
}
return false;
});
// 检查等待结果
if (!success) {
std::cerr << "条件变量等待超时" << std::endl;
}
// 如果SSE连接已关闭或等待超时抛出异常
if (!sse_running_) {
throw std::runtime_error("SSE连接已关闭未能获取消息端点");
}
if (msg_endpoint_.empty()) {
throw std::runtime_error("等待SSE连接超时未能获取消息端点");
}
std::cerr << "成功获取消息端点: " << msg_endpoint_ << std::endl;
}
// 发送初始化请求
// Send the request
json result = send_jsonrpc(req);
// 存储服务器能力
// Store server capabilities
server_capabilities_ = result["capabilities"];
// 发送已初始化通知
// Send initialized notification
request notification = request::create_notification("initialized");
send_jsonrpc(notification);
return true;
} catch (const std::exception& e) {
// 初始化失败关闭SSE连接
std::cerr << "初始化失败: " << e.what() << std::endl;
close_sse_connection();
// Initialization failed
return false;
}
}
@ -260,175 +200,9 @@ json client::list_resource_templates() {
return send_request("resources/templates/list").result;
}
void client::open_sse_connection() {
// 设置SSE连接状态为运行中
sse_running_ = true;
// 清空消息端点
{
std::lock_guard<std::mutex> lock(mutex_);
msg_endpoint_.clear();
// 通知等待的线程虽然消息端点为空但可以让等待的线程检查sse_running_状态
endpoint_cv_.notify_all();
}
// 打印连接信息(调试用)
std::string connection_info;
if (!base_url_.empty()) {
connection_info = "Base URL: " + base_url_ + ", SSE Endpoint: " + sse_endpoint_;
} else {
connection_info = "Host: " + host_ + ", Port: " + std::to_string(port_) + ", SSE Endpoint: " + sse_endpoint_;
}
std::cerr << "尝试建立SSE连接: " << connection_info << std::endl;
// 创建并启动SSE线程
sse_thread_ = std::make_unique<std::thread>([this]() {
int retry_count = 0;
const int max_retries = 5;
const int retry_delay_base = 1000; // 毫秒
while (sse_running_) {
try {
// 尝试建立SSE连接
std::cerr << "SSE线程: 尝试连接到 " << sse_endpoint_ << std::endl;
auto res = http_client_->Get(sse_endpoint_.c_str(),
[this](const char *data, size_t data_length) {
// 解析SSE数据
std::cerr << "SSE线程: 收到数据,长度: " << data_length << std::endl;
if (!parse_sse_data(data, data_length)) {
std::cerr << "SSE线程: 解析数据失败" << std::endl;
return false; // 解析失败,关闭连接
}
return sse_running_.load(); // 如果sse_running_为false关闭连接
});
// 检查连接是否成功
if (!res) {
std::string error_msg = "SSE连接失败: ";
error_msg += "错误代码: " + std::to_string(static_cast<int>(res.error()));
// 添加更详细的错误信息
switch (res.error()) {
case httplib::Error::Connection:
error_msg += " (连接错误,服务器可能未运行或无法访问)";
break;
case httplib::Error::Read:
error_msg += " (读取错误,服务器可能关闭了连接或响应格式不正确)";
break;
case httplib::Error::Write:
error_msg += " (写入错误)";
break;
case httplib::Error::ConnectionTimeout:
error_msg += " (连接超时)";
break;
case httplib::Error::Canceled:
error_msg += " (请求被取消)";
break;
default:
error_msg += " (未知错误)";
break;
}
throw std::runtime_error(error_msg);
}
// 连接成功后重置重试计数
retry_count = 0;
std::cerr << "SSE线程: 连接成功" << std::endl;
} catch (const std::exception& e) {
// 记录错误
std::cerr << "SSE连接错误: " << e.what() << std::endl;
// 如果已达到最大重试次数,停止尝试
if (++retry_count > max_retries) {
std::cerr << "达到最大重试次数停止SSE连接尝试" << std::endl;
break;
}
// 指数退避重试
int delay = retry_delay_base * (1 << (retry_count - 1)); // 2^(retry_count-1) * base_delay
std::cerr << "将在 " << delay << " 毫秒后重试 (尝试 " << retry_count << "/" << max_retries << ")" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
}
}
std::cerr << "SSE线程: 退出" << std::endl;
});
}
// 新增方法解析SSE数据
bool client::parse_sse_data(const char* data, size_t length) {
try {
std::string sse_data(data, length);
// 查找"data:"标记
auto data_pos = sse_data.find("data: ");
if (data_pos == std::string::npos) {
return true; // 不是数据事件,可能是注释或心跳,继续保持连接
}
// 查找数据行结束位置
auto newline_pos = sse_data.find("\n", data_pos);
if (newline_pos == std::string::npos) {
newline_pos = sse_data.length(); // 如果没有换行符,使用整个字符串
}
// 提取数据内容
std::string data_content = sse_data.substr(data_pos + 6, newline_pos - (data_pos + 6));
// 检查是否是心跳事件
if (sse_data.find("event: heartbeat") != std::string::npos) {
// 心跳事件,不需要处理数据
return true;
}
// 更新消息端点
{
std::lock_guard<std::mutex> lock(mutex_);
msg_endpoint_ = data_content;
// 通知等待的线程
endpoint_cv_.notify_all();
}
return true;
} catch (const std::exception& e) {
std::cerr << "解析SSE数据错误: " << e.what() << std::endl;
return false;
}
}
// 新增方法关闭SSE连接
void client::close_sse_connection() {
sse_running_ = false;
if (sse_thread_ && sse_thread_->joinable()) {
sse_thread_->join();
}
// 清空消息端点
{
std::lock_guard<std::mutex> lock(mutex_);
msg_endpoint_.clear();
// 通知等待的线程虽然消息端点为空但可以让等待的线程检查sse_running_状态
endpoint_cv_.notify_all();
}
}
json client::send_jsonrpc(const request& req) {
std::lock_guard<std::mutex> lock(mutex_);
// 检查消息端点是否已设置
if (msg_endpoint_.empty()) {
throw mcp_exception(error_code::internal_error, "消息端点未设置SSE连接可能未建立");
}
// 打印请求信息(调试用)
std::cerr << "发送JSON-RPC请求: 方法=" << req.method << ", 端点=" << msg_endpoint_ << std::endl;
// Convert request to JSON
json req_json = req.to_json();
std::string req_body = req_json.dump();
@ -443,33 +217,25 @@ json client::send_jsonrpc(const request& req) {
}
// Send the request
auto result = http_client_->Post(msg_endpoint_, headers, req_body, "application/json");
auto result = http_client_->Post("/jsonrpc", headers, req_body, "application/json");
if (!result) {
// Error occurred
auto err = result.error();
std::string error_msg;
switch (err) {
case httplib::Error::Connection:
error_msg = "连接错误,服务器可能未运行或无法访问";
break;
throw mcp_exception(error_code::server_error_start, "Connection error");
case httplib::Error::Read:
error_msg = "读取错误,服务器可能关闭了连接或响应格式不正确";
break;
throw mcp_exception(error_code::internal_error, "Read error");
case httplib::Error::Write:
error_msg = "写入错误";
break;
throw mcp_exception(error_code::internal_error, "Write error");
case httplib::Error::ConnectionTimeout:
error_msg = "连接超时";
break;
throw mcp_exception(error_code::server_error_start, "Timeout error");
default:
error_msg = "HTTP客户端错误: " + std::to_string(static_cast<int>(err));
break;
throw mcp_exception(error_code::internal_error,
"HTTP client error: " + std::to_string(static_cast<int>(err)));
}
std::cerr << "JSON-RPC请求失败: " << error_msg << std::endl;
throw mcp_exception(error_code::internal_error, error_msg);
}
// Check if it's a notification (no response expected)
@ -481,9 +247,6 @@ json client::send_jsonrpc(const request& req) {
try {
json res_json = json::parse(result->body);
// 打印响应信息(调试用)
std::cerr << "收到JSON-RPC响应: " << res_json.dump() << std::endl;
// Check for error
if (res_json.contains("error")) {
int code = res_json["error"]["code"];
@ -504,45 +267,4 @@ json client::send_jsonrpc(const request& req) {
}
}
bool client::check_server_accessible() {
std::cerr << "检查服务器是否可访问..." << std::endl;
try {
// 尝试发送一个简单的GET请求到服务器
auto res = http_client_->Get("/");
if (res) {
std::cerr << "服务器可访问,状态码: " << res->status << std::endl;
return true;
} else {
std::string error_msg = "服务器不可访问,错误代码: " + std::to_string(static_cast<int>(res.error()));
// 添加更详细的错误信息
switch (res.error()) {
case httplib::Error::Connection:
error_msg += " (连接错误,服务器可能未运行或无法访问)";
break;
case httplib::Error::Read:
error_msg += " (读取错误)";
break;
case httplib::Error::Write:
error_msg += " (写入错误)";
break;
case httplib::Error::ConnectionTimeout:
error_msg += " (连接超时)";
break;
default:
error_msg += " (未知错误)";
break;
}
std::cerr << error_msg << std::endl;
return false;
}
} catch (const std::exception& e) {
std::cerr << "检查服务器可访问性时发生异常: " << e.what() << std::endl;
return false;
}
}
} // namespace mcp

View File

@ -10,8 +10,8 @@
namespace mcp {
server::server(const std::string& host, int port, const std::string& sse_endpoint, const std::string& msg_endpoint_prefix)
: host_(host), port_(port), sse_endpoint_(sse_endpoint), msg_endpoint_(msg_endpoint_prefix), name_("MCP Server"), version_(MCP_VERSION) {
server::server(const std::string& host, int port)
: host_(host), port_(port), name_("MCP Server"), version_(MCP_VERSION) {
http_server_ = std::make_unique<httplib::Server>();
@ -33,49 +33,30 @@ bool server::start(bool blocking) {
return true; // Already running
}
std::cerr << "启动MCP服务器: " << host_ << ":" << port_ << std::endl;
// 设置CORS处理
http_server_->Options(".*", [](const httplib::Request& req, httplib::Response& res) {
res.set_header("Access-Control-Allow-Origin", "*");
res.set_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
res.set_header("Access-Control-Allow-Headers", "Content-Type");
res.status = 204; // No Content
});
// 设置JSON-RPC端点
http_server_->Post(msg_endpoint_.c_str(), [this](const httplib::Request& req, httplib::Response& res) {
// Set up JSON-RPC endpoint
http_server_->Post("/jsonrpc", [this](const httplib::Request& req, httplib::Response& res) {
this->handle_jsonrpc(req, res);
});
// 设置SSE端点
http_server_->Get(sse_endpoint_.c_str(), [this](const httplib::Request& req, httplib::Response& res) {
this->handle_sse(req, res);
});
// 启动服务器
// Start the server
if (blocking) {
running_ = true;
std::cerr << "以阻塞模式启动服务器" << std::endl;
print_status();
if (!http_server_->listen(host_.c_str(), port_)) {
running_ = false;
std::cerr << "无法在 " << host_ << ":" << port_ << " 上启动服务器" << std::endl;
std::cerr << "Failed to start server on " << host_ << ":" << port_ << std::endl;
return false;
}
return true;
} else {
// 在单独的线程中启动
// Start in a separate thread
server_thread_ = std::make_unique<std::thread>([this]() {
std::cerr << "在单独的线程中启动服务器" << std::endl;
if (!http_server_->listen(host_.c_str(), port_)) {
std::cerr << "无法在 " << host_ << ":" << port_ << " 上启动服务器" << std::endl;
std::cerr << "Failed to start server on " << host_ << ":" << port_ << std::endl;
running_ = false;
return;
}
});
running_ = true;
print_status();
return true;
}
}
@ -85,53 +66,15 @@ void server::stop() {
return;
}
std::cerr << "正在停止MCP服务器..." << std::endl;
print_status();
running_ = false;
// 关闭所有SSE连接
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto& [session_id, dispatcher] : session_dispatchers_) {
try {
std::cerr << "关闭会话: " << session_id << std::endl;
dispatcher->close();
} catch (const std::exception& e) {
std::cerr << "关闭会话时发生异常: " << session_id << ", " << e.what() << std::endl;
}
}
}
// 等待所有SSE线程结束
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto it = sse_threads_.begin(); it != sse_threads_.end();) {
auto& [session_id, thread] = *it;
if (thread && thread->joinable()) {
try {
std::cerr << "等待会话线程结束: " << session_id << std::endl;
// 分离线程而不是等待它结束,避免可能的死锁
thread->detach();
} catch (const std::exception& e) {
std::cerr << "等待会话线程结束时发生异常: " << session_id << ", " << e.what() << std::endl;
}
}
it = sse_threads_.erase(it);
}
session_dispatchers_.clear();
}
if (http_server_) {
std::cerr << "停止HTTP服务器..." << std::endl;
http_server_->stop();
}
if (server_thread_ && server_thread_->joinable()) {
std::cerr << "等待服务器线程结束..." << std::endl;
server_thread_->join();
}
std::cerr << "MCP服务器已停止" << std::endl;
running_ = false;
}
bool server::is_running() const {
@ -296,177 +239,19 @@ void server::set_auth_handler(std::function<bool(const std::string&)> handler) {
auth_handler_ = handler;
}
void server::handle_sse(const httplib::Request& req, httplib::Response& res) {
// 生成会话ID
std::string session_id = generate_session_id();
std::string session_uri = msg_endpoint_ + "?session_id=" + session_id;
std::cerr << "新的SSE连接: 客户端=" << req.remote_addr << ", 会话ID=" << session_id << std::endl;
// 创建会话特定的事件分发器
auto session_dispatcher = std::make_shared<event_dispatcher>();
// 添加会话分发器到映射表
{
std::lock_guard<std::mutex> lock(mutex_);
session_dispatchers_[session_id] = session_dispatcher;
}
// 创建会话线程,使用值捕获而不是引用捕获
auto thread = std::make_unique<std::thread>([this, session_id, session_uri, session_dispatcher]() {
try {
std::cerr << "SSE会话线程启动: " << session_id << std::endl;
// 发送初始会话URI
std::this_thread::sleep_for(std::chrono::seconds(1));
std::stringstream ss;
ss << "event: endpoint\ndata: " << session_uri << "\n\n";
session_dispatcher->send_event(ss.str());
std::cerr << "发送会话URI: " << session_uri << " 到会话: " << session_id << std::endl;
// 定期发送心跳,检测连接状态
int heartbeat_count = 0;
while (running_ && !session_dispatcher->is_closed()) {
std::this_thread::sleep_for(std::chrono::seconds(10));
// 检查分发器是否已关闭
if (session_dispatcher->is_closed()) {
std::cerr << "会话已关闭,停止心跳: " << session_id << std::endl;
break;
}
// 发送心跳事件
std::stringstream heartbeat;
heartbeat << "event: heartbeat\ndata: " << heartbeat_count++ << "\n\n";
try {
session_dispatcher->send_event(heartbeat.str());
std::cerr << "发送心跳到会话: " << session_id << ", 计数: " << heartbeat_count << std::endl;
} catch (const std::exception& e) {
std::cerr << "发送心跳失败,假定连接已关闭: " << e.what() << std::endl;
break;
}
}
std::cerr << "SSE会话线程退出: " << session_id << std::endl;
} catch (const std::exception& e) {
std::cerr << "SSE会话线程异常: " << session_id << ", " << e.what() << std::endl;
}
// 清理资源
{
std::lock_guard<std::mutex> lock(mutex_);
session_dispatchers_.erase(session_id);
sse_threads_.erase(session_id);
}
});
// 存储线程
{
std::lock_guard<std::mutex> lock(mutex_);
sse_threads_[session_id] = std::move(thread);
}
// 设置分块内容提供者
res.set_chunked_content_provider("text/event-stream", [this, session_id, session_dispatcher](size_t /* offset */, httplib::DataSink& sink) {
try {
// 检查会话是否已关闭
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = session_dispatchers_.find(session_id);
if (it == session_dispatchers_.end() || it->second->is_closed()) {
std::cerr << "会话已关闭,停止内容提供: " << session_id << std::endl;
// 清理资源
auto thread_it = sse_threads_.find(session_id);
if (thread_it != sse_threads_.end() && thread_it->second && thread_it->second->joinable()) {
thread_it->second->detach(); // 分离线程,让它自行清理
}
session_dispatchers_.erase(session_id);
sse_threads_.erase(session_id);
return false;
}
}
// 等待事件
bool result = session_dispatcher->wait_event(&sink);
if (!result) {
std::cerr << "等待事件失败,关闭连接: " << session_id << std::endl;
// 关闭会话
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = session_dispatchers_.find(session_id);
if (it != session_dispatchers_.end()) {
it->second->close();
}
// 清理资源
auto thread_it = sse_threads_.find(session_id);
if (thread_it != sse_threads_.end() && thread_it->second && thread_it->second->joinable()) {
thread_it->second->detach(); // 分离线程,让它自行清理
}
session_dispatchers_.erase(session_id);
sse_threads_.erase(session_id);
}
return false;
}
return true;
} catch (const std::exception& e) {
std::cerr << "SSE内容提供者异常: " << e.what() << std::endl;
return false;
}
});
}
void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res) {
// 设置响应头
// Set response headers
res.set_header("Content-Type", "application/json");
res.set_header("Access-Control-Allow-Origin", "*");
res.set_header("Access-Control-Allow-Methods", "POST, OPTIONS");
res.set_header("Access-Control-Allow-Headers", "Content-Type");
// 处理OPTIONS请求CORS预检
if (req.method == "OPTIONS") {
res.status = 204; // No Content
return;
}
// Get client address
std::string client_address = req.remote_addr;
// 获取会话ID
auto it = req.params.find("session_id");
std::string session_id = it != req.params.end() ? it->second : "";
std::cerr << "收到JSON-RPC请求: 会话ID=" << session_id << ", 路径=" << req.path << std::endl;
// 检查会话是否存在
{
std::lock_guard<std::mutex> lock(mutex_);
if (!session_id.empty() && session_dispatchers_.find(session_id) == session_dispatchers_.end()) {
std::cerr << "会话不存在: " << session_id << std::endl;
json error_response = {
{"jsonrpc", "2.0"},
{"error", {
{"code", static_cast<int>(error_code::invalid_request)},
{"message", "Session not found"}
}},
{"id", nullptr}
};
res.set_content(error_response.dump(), "application/json");
return;
}
}
// 解析请求
// Parse the request
json req_json;
try {
req_json = json::parse(req.body);
std::cerr << "请求内容: " << req_json.dump() << std::endl;
} catch (const json::exception& e) {
// 无效的JSON
std::cerr << "解析JSON失败: " << e.what() << std::endl;
// Invalid JSON
json error_response = {
{"jsonrpc", "2.0"},
{"error", {
@ -479,10 +264,9 @@ void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res)
return;
}
// 检查是否是批量请求
// Check if it's a batch request
if (req_json.is_array()) {
// 批量请求暂不支持
std::cerr << "不支持批量请求" << std::endl;
// Batch request not supported yet
json error_response = {
{"jsonrpc", "2.0"},
{"error", {
@ -495,7 +279,7 @@ void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res)
return;
}
// 转换为请求对象
// Convert to request object
request mcp_req;
try {
mcp_req.jsonrpc = req_json["jsonrpc"];
@ -509,8 +293,7 @@ void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res)
mcp_req.params = req_json["params"];
}
} catch (const json::exception& e) {
// 无效的请求
std::cerr << "无效的请求: " << e.what() << std::endl;
// Invalid request
json error_response = {
{"jsonrpc", "2.0"},
{"error", {
@ -523,76 +306,67 @@ void server::handle_jsonrpc(const httplib::Request& req, httplib::Response& res)
return;
}
// 处理请求
std::cerr << "处理方法: " << mcp_req.method << std::endl;
json result = process_request(mcp_req, session_id);
std::cerr << "响应: " << result.dump() << std::endl;
// Process the request
json result = process_request(mcp_req, client_address);
res.set_content(result.dump(), "application/json");
}
json server::process_request(const request& req, const std::string& session_id) {
// 检查是否是通知
if (req.is_notification()) {
std::cerr << "处理通知: " << req.method << std::endl;
// 通知没有响应
json server::process_request(const request& req, const std::string& client_address) {
// Check if it's a notification
if (req.is_notification()) {
if (req.method == "notifications/initialized") {
set_session_initialized(session_id, true);
set_client_initialized(client_address, true);
}
// No response for notifications
return json::object();
}
// 处理方法调用
// Handle method call
try {
std::cerr << "处理方法调用: " << req.method << std::endl;
// 特殊情况:初始化
// Special case for initialize
if (req.method == "initialize") {
return handle_initialize(req);
return handle_initialize(req, client_address);
} else if (req.method == "ping") {
// 接收者必须立即响应一个空响应
std::cerr << "处理ping请求" << std::endl;
// The receiver MUST respond promptly with an empty response
return response::create_success(req.id, {}).to_json();
}
if (!is_session_initialized(session_id)) {
// Check if client is initialized
if (!is_client_initialized(client_address)) {
// Client not initialized
return response::create_error(
req.id,
error_code::invalid_request,
"Session not initialized"
req.id,
error_code::invalid_request,
"Client not initialized"
).to_json();
}
// 查找注册的方法处理器
// Look for registered method handler
std::lock_guard<std::mutex> lock(mutex_);
auto it = method_handlers_.find(req.method);
if (it != method_handlers_.end()) {
// 调用处理器
std::cerr << "调用方法处理器: " << req.method << std::endl;
// Call the handler
json result = it->second(req.params);
// 创建成功响应
std::cerr << "方法调用成功: " << req.method << std::endl;
// Create success response
return response::create_success(req.id, result).to_json();
}
// 方法未找到
std::cerr << "方法未找到: " << req.method << std::endl;
// Method not found
return response::create_error(
req.id,
error_code::method_not_found,
"Method not found: " + req.method
).to_json();
} catch (const mcp_exception& e) {
// MCP异常
std::cerr << "MCP异常: " << e.what() << ", 代码: " << static_cast<int>(e.code()) << std::endl;
// MCP exception
return response::create_error(
req.id,
e.code(),
e.what()
).to_json();
} catch (const std::exception& e) {
// 通用异常
std::cerr << "处理请求时发生异常: " << e.what() << std::endl;
// Generic exception
return response::create_error(
req.id,
error_code::internal_error,
@ -601,7 +375,7 @@ json server::process_request(const request& req, const std::string& session_id)
}
}
json server::handle_initialize(const request& req) {
json server::handle_initialize(const request& req, const std::string& client_address) {
const json& params = req.params;
// Version negotiation
@ -640,6 +414,9 @@ json server::handle_initialize(const request& req) {
}
}
// Mark client as not initialized yet
set_client_initialized(client_address, false);
// Log connection
// std::cout << "Client connected: " << client_name << " " << client_version << std::endl;
@ -655,19 +432,17 @@ json server::handle_initialize(const request& req) {
{"serverInfo", server_info}
};
// set_session_initialized(session_id, false);
return response::create_success(req.id, result).to_json();
}
void server::send_request(const std::string& session_id, const std::string& method, const json& params) {
void server::send_request(const std::string& client_address, const std::string& method, const json& params) {
// Check if the method is ping or logging
bool is_allowed_before_init = (method == "ping" || method == "logging");
// Check if client is initialized or if this is an allowed method
if (!is_allowed_before_init && !is_session_initialized(session_id)) {
if (!is_allowed_before_init && !is_client_initialized(client_address)) {
// Client not initialized and method is not allowed before initialization
std::cerr << "Cannot send " << method << " request to session " << session_id
std::cerr << "Cannot send " << method << " request to client " << client_address
<< " before it is initialized" << std::endl;
return;
}
@ -679,77 +454,17 @@ void server::send_request(const std::string& session_id, const std::string& meth
// This would typically involve sending an HTTP request to the client
}
// Check if a session is initialized
bool server::is_session_initialized(const std::string& session_id) const {
// Check if a client is initialized
bool server::is_client_initialized(const std::string& client_address) const {
std::lock_guard<std::mutex> lock(mutex_);
auto it = session_initialized_.find(session_id);
return (it != session_initialized_.end() && it->second);
auto it = client_initialized_.find(client_address);
return (it != client_initialized_.end() && it->second);
}
// Set session initialization status
void server::set_session_initialized(const std::string& session_id, bool initialized) {
// Set client initialization status
void server::set_client_initialized(const std::string& client_address, bool initialized) {
std::lock_guard<std::mutex> lock(mutex_);
session_initialized_[session_id] = initialized;
}
// Generate a random session ID
std::string server::generate_session_id() const {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, 15);
std::stringstream ss;
ss << std::hex;
for (int i = 0; i < 32; ++i) {
ss << dis(gen);
}
return ss.str();
}
void server::print_status() const {
std::lock_guard<std::mutex> lock(mutex_);
std::cerr << "=== MCP服务器状态 ===" << std::endl;
std::cerr << "服务器地址: " << host_ << ":" << port_ << std::endl;
std::cerr << "运行状态: " << (running_ ? "运行中" : "已停止") << std::endl;
std::cerr << "SSE端点: " << sse_endpoint_ << std::endl;
std::cerr << "消息端点前缀: " << msg_endpoint_ << std::endl;
std::cerr << "活跃会话数: " << session_dispatchers_.size() << std::endl;
for (const auto& [session_id, dispatcher] : session_dispatchers_) {
std::cerr << " - 会话ID: " << session_id << ", 状态: " << (dispatcher->is_closed() ? "已关闭" : "活跃") << std::endl;
}
std::cerr << "SSE线程数: " << sse_threads_.size() << std::endl;
std::cerr << "注册的方法数: " << method_handlers_.size() << std::endl;
for (const auto& [method, _] : method_handlers_) {
std::cerr << " - " << method << std::endl;
}
std::cerr << "注册的通知数: " << notification_handlers_.size() << std::endl;
for (const auto& [method, _] : notification_handlers_) {
std::cerr << " - " << method << std::endl;
}
std::cerr << "注册的资源数: " << resources_.size() << std::endl;
for (const auto& [path, _] : resources_) {
std::cerr << " - " << path << std::endl;
}
std::cerr << "注册的工具数: " << tools_.size() << std::endl;
for (const auto& [name, _] : tools_) {
std::cerr << " - " << name << std::endl;
}
std::cerr << "已初始化的会话数: " << session_initialized_.size() << std::endl;
for (const auto& [session_id, initialized] : session_initialized_) {
std::cerr << " - " << session_id << ": " << (initialized ? "已初始化" : "未初始化") << std::endl;
}
std::cerr << "======================" << std::endl;
client_initialized_[client_address] = initialized;
}
} // namespace mcp