cpp-mcp/test/test_mcp_lifecycle_transpor...

184 lines
5.6 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

/**
* @file test_mcp_lifecycle_transport.cpp
* @brief 测试MCP消息生命周期和传输相关功能
*
* 本文件包含对MCP消息生命周期和SSE传输的单元测试基于规范2024-11-05。
*/
#include "mcp_message.h"
#include "mcp_server.h"
#include "mcp_client.h"
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include <string>
#include <thread>
#include <future>
#include <chrono>
// 测试类,用于设置服务器和客户端
class McpLifecycleTransportTest : public ::testing::Test {
protected:
void SetUp() override {
// 创建服务器
server = std::make_unique<mcp::server>("localhost", 8096);
server->set_server_info("TestServer", "2024-11-05");
// 设置服务器能力
mcp::json capabilities = {
{"tools", {{"listChanged", true}}},
{"transport", {{"sse", true}}}
};
server->set_capabilities(capabilities);
// 注册一个简单的方法
server->register_method("test_method", [](const mcp::json& params) -> mcp::json {
return {{"result", "success"}, {"params_received", params}};
});
// 注册一个通知处理器
server->register_notification("test_notification", [this](const mcp::json& params) {
notification_received = true;
notification_params = params;
});
}
void TearDown() override {
// 停止服务器
if (server && server_thread.joinable()) {
server->stop();
server_thread.join();
}
}
// 启动服务器
void start_server() {
server_thread = std::thread([this]() {
server->start(false);
});
// 等待服务器启动
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::unique_ptr<mcp::server> server;
std::thread server_thread;
bool notification_received = false;
mcp::json notification_params;
};
// 测试消息生命周期 - 初始化
TEST_F(McpLifecycleTransportTest, InitializationTest) {
// 启动服务器
start_server();
// 创建客户端
mcp::client client("localhost", 8096);
client.set_timeout(5);
// 测试初始化
bool init_result = client.initialize("TestClient", "1.0.0");
EXPECT_TRUE(init_result);
// 获取服务器能力
mcp::json server_capabilities = client.get_server_capabilities();
EXPECT_TRUE(server_capabilities.contains("tools"));
EXPECT_TRUE(server_capabilities.contains("transport"));
EXPECT_TRUE(server_capabilities["transport"]["sse"].get<bool>());
}
// 测试消息生命周期 - 请求和响应
TEST_F(McpLifecycleTransportTest, RequestResponseTest) {
// 启动服务器
start_server();
// 创建客户端
mcp::client client("localhost", 8096);
client.set_timeout(5);
client.initialize("TestClient", "1.0.0");
// 发送请求并获取响应
mcp::json params = {{"key", "value"}, {"number", 42}};
mcp::response response = client.send_request("test_method", params);
// 验证响应
EXPECT_FALSE(response.is_error());
EXPECT_EQ(response.result["result"], "success");
EXPECT_EQ(response.result["params_received"]["key"], "value");
EXPECT_EQ(response.result["params_received"]["number"], 42);
}
// 测试消息生命周期 - 通知
TEST_F(McpLifecycleTransportTest, NotificationTest) {
// 启动服务器
start_server();
// 创建客户端
mcp::client client("localhost", 8096);
client.set_timeout(5);
client.initialize("TestClient", "1.0.0");
// 发送通知
mcp::json params = {{"event", "update"}, {"status", "completed"}};
client.send_notification("test_notification", params);
// 等待通知处理
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// 验证通知已接收
EXPECT_TRUE(notification_received);
EXPECT_EQ(notification_params["event"], "update");
EXPECT_EQ(notification_params["status"], "completed");
}
// 测试SSE传输 - 使用ping方法测试SSE连接
TEST_F(McpLifecycleTransportTest, SseTransportTest) {
// 启动服务器
start_server();
// 注册一个特殊的方法用于测试SSE连接
server->register_method("sse_test", [](const mcp::json& params) -> mcp::json {
return {{"sse_test_result", true}};
});
// 创建客户端
mcp::client client("localhost", 8096);
client.set_timeout(5);
client.initialize("TestClient", "1.0.0");
// 等待SSE连接建立
std::this_thread::sleep_for(std::chrono::milliseconds(200));
// 测试SSE连接是否正常工作 - 使用ping方法
bool ping_result = client.ping();
EXPECT_TRUE(ping_result);
// 发送请求并获取响应验证SSE连接正常工作
mcp::response response = client.send_request("sse_test");
EXPECT_FALSE(response.is_error());
EXPECT_TRUE(response.result["sse_test_result"].get<bool>());
}
// 测试ping功能
TEST_F(McpLifecycleTransportTest, PingTest) {
// 启动服务器
start_server();
// 创建客户端
mcp::client client("localhost", 8096);
client.set_timeout(5);
client.initialize("TestClient", "1.0.0");
// 测试ping
bool ping_result = client.ping();
EXPECT_TRUE(ping_result);
// 停止服务器
server->stop();
server_thread.join();
// 等待服务器完全停止
std::this_thread::sleep_for(std::chrono::milliseconds(200));
// 再次测试ping应该失败
ping_result = client.ping();
EXPECT_FALSE(ping_result);
}