#include #include #include #include #include #include namespace asio = boost::asio; class WorkerCoroutine { public: WorkerCoroutine(asio::io_context& io, int id) : id_(id), isOk_(false), calData_(0) { } ~WorkerCoroutine() { std::cout << "Worker " << id_ << " 销毁\n"; } // 注意:这里返回asio::awaitable的引用,避免拷贝 asio::awaitable Run() { std::cout << "Worker " << id_ << " 开始运行, ThreadID:" << std::this_thread::get_id() << "\n"; isOk_ = false; bool success = co_await simulateCal(); if (!success) { std::cout << "Worker " << id_ << " 计算失败\n"; co_return false; } std::cout << "Worker " << id_ << " 计算完成: " << calData_ << "\n"; isOk_ = true; co_return true; } int GetID() const { return id_; } int GetCalData() const { return calData_; } bool IsOk() const { return isOk_; } private: asio::awaitable simulateCal() { std::cout << "Worker " << id_ << " 模拟计算开始\n"; // 模拟一些计算 calData_ = id_ * 100 + 12; // 调用阻塞函数 co_return co_await readBlockCoroutine(); } // 在协程中执行阻塞操作的正确方式 asio::awaitable readBlockCoroutine() { std::cout << "Worker " << id_ << " 开始阻塞操作 (线程: " << std::this_thread::get_id() << ")\n"; // 将阻塞操作转移到线程池执行 asio::thread_pool pool(1); // 使用独立的线程池 auto result = co_await asio::co_spawn( pool, [this]() -> asio::awaitable { // 真正的阻塞操作在线程池线程执行 std::cout << "Worker " << id_ << " 在线程池中执行 (线程: " << std::this_thread::get_id() << ")\n"; // 这是阻塞操作 std::this_thread::sleep_for(std::chrono::seconds(1 + (id_ % 3))); std::cout << "Worker " << id_ << " 阻塞操作结束\n"; co_return true; }, asio::use_awaitable); pool.join(); // 等待线程池完成 co_return result; } private: int id_; bool isOk_; int calData_; }; // 版本1: 使用co_spawn启动多个协程 int testMultipleCoroutinesV1() { std::cout << "\n=== 测试版本1: 使用co_spawn启动多个Worker ===" << std::endl; asio::io_context io; const int num_workers = 3; // 创建Worker实例 std::vector> workers; for (int i = 0; i < num_workers; ++i) { workers.push_back(std::make_shared(io, i + 1)); } // 使用vector存储future std::vector> futures; // 启动所有协程 for (const auto& worker : workers) { // 注意:这里捕获worker的shared_ptr,确保生命周期 futures.push_back(asio::co_spawn( io, [worker]() -> asio::awaitable { // 直接调用Run(),不需要再次包装 co_return co_await worker->Run(); }, asio::use_future)); } // 运行io_context std::thread io_thread([&io]() { std::cout << "IO线程开始运行\n"; io.run(); std::cout << "IO线程结束\n"; }); // 主线程可以继续工作 std::cout << "主线程继续工作...\n"; for (int i = 0; i < 3; ++i) { std::cout << "主线程工作 " << i + 1 << "\n"; std::this_thread::sleep_for(std::chrono::milliseconds(500)); } // 等待所有结果 std::cout << "\n等待Worker结果...\n"; for (size_t i = 0; i < futures.size(); ++i) { try { bool result = futures[i].get(); std::cout << "Worker " << workers[i]->GetID() << " 结果: " << (result ? "成功" : "失败") << ", 数据: " << workers[i]->GetCalData() << "\n"; } catch (const std::exception& e) { std::cerr << "Worker " << workers[i]->GetID() << " 异常: " << e.what() << "\n"; } } io.stop(); io_thread.join(); return 0; } // 版本2: 更简单的lambda方式 int testMultipleCoroutinesV2() { std::cout << "\n\n=== 测试版本2: 直接使用lambda协程 ===" << std::endl; asio::io_context io; const int num_tasks = 5; // 存储结果 std::vector> futures; std::atomic completed{0}; // 启动多个协程任务 for (int i = 0; i < num_tasks; ++i) { auto task_id = i + 1; // 使用lambda创建协程 auto task = [task_id]() -> asio::awaitable { std::cout << "任务 " << task_id << " 开始 (协程线程: " << std::this_thread::get_id() << ")\n"; // 模拟异步操作 asio::steady_timer timer(asio::any_io_executor{}, std::chrono::seconds(1)); co_await timer.async_wait(asio::use_awaitable); // 模拟阻塞操作 std::cout << "任务 " << task_id << " 执行阻塞操作\n"; asio::thread_pool pool(1); co_await asio::co_spawn( pool, [task_id]() -> asio::awaitable { std::this_thread::sleep_for(std::chrono::seconds(task_id % 3 + 1)); co_return task_id * 100; }, asio::use_awaitable); pool.join(); std::cout << "任务 " << task_id << " 完成\n"; co_return task_id * 100; }; // 启动协程 futures.push_back(asio::co_spawn(io, task, asio::use_future)); } // 运行io_context asio::thread_pool io_thread_pool(1); asio::post(io_thread_pool, [&io]() { io.run(); }); // 等待所有任务完成 std::cout << "\n等待所有任务完成...\n"; for (int i = 0; i < num_tasks; ++i) { try { int result = futures[i].get(); completed++; std::cout << "任务 " << (i + 1) << " 返回: " << result << "\n"; } catch (const std::exception& e) { std::cerr << "任务 " << (i + 1) << " 异常: " << e.what() << "\n"; } } io.stop(); io_thread_pool.join(); std::cout << "完成 " << completed << "/" << num_tasks << " 个任务\n"; return 0; } // 版本3: 使用detached启动(不等待结果) int testMultipleCoroutinesV3() { std::cout << "\n\n=== 测试版本3: 使用detached启动 ===" << std::endl; asio::io_context io; const int num_tasks = 4; std::atomic started{0}; std::atomic completed{0}; std::mutex cout_mutex; // 启动协程但不等待结果 for (int i = 0; i < num_tasks; ++i) { auto task_id = i + 1; asio::co_spawn( io, [task_id, &started, &completed, &cout_mutex]() -> asio::awaitable { started++; { std::lock_guard lock(cout_mutex); std::cout << "任务 " << task_id << " 开始\n"; } // 模拟工作 asio::thread_pool pool(1); co_await asio::co_spawn( pool, [task_id]() -> asio::awaitable { std::this_thread::sleep_for(std::chrono::seconds(1)); co_return; }, asio::use_awaitable); pool.join(); completed++; { std::lock_guard lock(cout_mutex); std::cout << "任务 " << task_id << " 完成\n"; } co_return; }, asio::detached); } // 运行io_context直到所有任务完成 std::cout << "启动 " << started << " 个任务\n"; asio::thread_pool io_pool(2); asio::post(io_pool, [&io]() { io.run(); }); // 等待所有任务完成 while (completed < num_tasks) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } io.stop(); io_pool.join(); std::cout << "所有 " << completed << " 个任务完成\n"; return 0; } // 版本4: 修复原始代码中的问题 int testRunCoroutineFixed() { std::cout << "\n\n=== 修复原始代码 ===" << std::endl; asio::io_context io; // 创建Worker实例 WorkerCoroutine wk1(io, 1); WorkerCoroutine wk2(io, 2); // 启动协程 std::future future1 = asio::co_spawn(io, [&wk1]() -> asio::awaitable { co_return co_await wk1.Run(); }, asio::use_future); std::future future2 = asio::co_spawn(io, [&wk2]() -> asio::awaitable { co_return co_await wk2.Run(); }, asio::use_future); // 运行io_context asio::thread_pool io_pool(1); asio::post(io_pool, [&io]() { io.run(); }); std::cout << "开始异步计算,主线程可以继续...\n"; // 主线程可以继续工作 for (int i = 0; i < 3; ++i) { std::cout << "主线程工作 " << i + 1 << "\n"; std::this_thread::sleep_for(std::chrono::milliseconds(300)); } // 等待结果 bool result1 = false, result2 = false; try { result1 = future1.get(); std::cout << "Worker1 结果: " << (result1 ? "成功" : "失败") << "\n"; } catch (const std::exception& e) { std::cerr << "Worker1 异常: " << e.what() << "\n"; } try { result2 = future2.get(); std::cout << "Worker2 结果: " << (result2 ? "成功" : "失败") << "\n"; } catch (const std::exception& e) { std::cerr << "Worker2 异常: " << e.what() << "\n"; } io.stop(); io_pool.join(); return (result1 && result2) ? 0 : 1; } int main() { #ifdef _WIN32 SetConsoleOutputCP(CP_UTF8); #endif std::cout << "=== C++协程多实例测试 ===\n"; // 测试各种方式 testMultipleCoroutinesV1(); // testMultipleCoroutinesV2(); // testMultipleCoroutinesV3(); // testRunCoroutineFixed(); return 0; }