/** * @file mcp_thread_pool.h * @brief 简单的线程池实现 */ #ifndef MCP_THREAD_POOL_H #define MCP_THREAD_POOL_H #include #include #include #include #include #include #include #include #include namespace mcp { class thread_pool { public: /** * @brief 构造函数 * @param num_threads 线程池中的线程数量 */ explicit thread_pool(size_t num_threads = std::thread::hardware_concurrency()) : stop_(false) { for (size_t i = 0; i < num_threads; ++i) { workers_.emplace_back([this] { while (true) { std::function task; { std::unique_lock lock(queue_mutex_); condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); }); if (stop_ && tasks_.empty()) { return; } task = std::move(tasks_.front()); tasks_.pop(); } task(); } }); } } /** * @brief 析构函数 */ ~thread_pool() { { std::unique_lock lock(queue_mutex_); stop_ = true; } condition_.notify_all(); for (std::thread& worker : workers_) { if (worker.joinable()) { worker.join(); } } } /** * @brief 提交任务到线程池 * @param f 任务函数 * @param args 任务参数 * @return 任务的future */ template auto enqueue(F&& f, Args&&... args) -> std::future::type> { using return_type = typename std::invoke_result::type; auto task = std::make_shared>( std::bind(std::forward(f), std::forward(args)...) ); std::future result = task->get_future(); { std::unique_lock lock(queue_mutex_); if (stop_) { throw std::runtime_error("线程池已停止,无法添加任务"); } tasks_.emplace([task]() { (*task)(); }); } condition_.notify_one(); return result; } private: // 工作线程 std::vector workers_; // 任务队列 std::queue> tasks_; // 互斥锁和条件变量 std::mutex queue_mutex_; std::condition_variable condition_; // 停止标志 std::atomic stop_; }; } // namespace mcp #endif // MCP_THREAD_POOL_H