how-to-use/cpp/common/box_threadpool.hpp

209 lines
5.7 KiB
C++
Raw Permalink Normal View History

2024-03-08 14:01:18 +08:00
#ifndef THREAD_POOL_HEADER
#define THREAD_POOL_HEADER
#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
namespace cppbox {
// 线程安全队列
template <typename T> class CQueueMt
{
public:
CQueueMt() = default;
CQueueMt(CQueueMt&& rh) noexcept {}
~CQueueMt() = default;
public:
// 返回队列是否为空
bool IsEmpty()
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
// 返回队列数量
size_t Size()
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
// 添加元素
void Push(T& t)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.emplace(t);
}
// 取出元素
bool Pop(T& t)
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_queue.empty())
return false;
t = std::move(m_queue.front());
m_queue.pop();
return true;
}
// 清除
void clear()
{
std::unique_lock<std::mutex> lock(m_mutex);
std::queue<T> queue;
std::swap(m_queue, queue);
}
private:
std::mutex m_mutex;
std::queue<T> m_queue;
};
class CBoxThreadPool
{
private:
class CWorker
{
public:
CWorker(CBoxThreadPool* pThreadPool, const size_t nID)
: m_nID(nID), m_pThreadPool(pThreadPool)
{
is_run_ = false;
}
// 重载操作
void operator()()
{
// 1.定义基础函数类 func
std::function<void()> func;
// 2.是否成功取出队列中的元素
bool have = false;
while (m_pThreadPool->is_start_ || !is_run_ || isContinue()) {
// 进入 while 后,标志位置成已进入。
is_run_ = true;
{
// 2.1 为线程环境加锁,互斥访问线程的休眠与唤醒
std::unique_lock<std::mutex> lock(m_pThreadPool->mutex_);
// 2.2 如果队列为空则阻塞并等待条件变量的通知
if (m_pThreadPool->m_queue.IsEmpty())
m_pThreadPool->cv_.wait(lock);
// 2.3 取出任务队列中的元素
have = m_pThreadPool->m_queue.Pop(func);
}
if (have)
func();
}
}
// 检查是否需要继续完成未完成的任务
bool isContinue()
{
if (!m_pThreadPool->is_wait_)
return false;
if (m_pThreadPool->m_queue.IsEmpty())
return false;
return true;
}
private:
size_t m_nID; // 工作ID
CBoxThreadPool* m_pThreadPool; // 所属线程池
bool is_run_; // 是否进入工作了
};
public:
explicit CBoxThreadPool(const int nThreadNum = 4)
: th_cnt_(nThreadNum),
is_start_(false),
is_wait_(false),
threads_(std::vector<std::thread>(nThreadNum))
{
}
~CBoxThreadPool() { ShutDownWaitCurrent(); }
CBoxThreadPool(const CBoxThreadPool&) = delete;
CBoxThreadPool(CBoxThreadPool&&) = delete;
CBoxThreadPool& operator=(const CBoxThreadPool&) = delete;
CBoxThreadPool& operator=(CBoxThreadPool&&) = delete;
public:
// 初始化线程池
void Init()
{
if (!is_start_) {
threads_.resize(th_cnt_);
for (size_t i = 0; i < threads_.size(); ++i) {
threads_.at(i) = std::thread(CWorker(this, i));
}
is_start_ = true;
is_wait_ = false;
return;
}
}
// 等待各线程当前任务完成然后关闭线程池
void ShutDownWaitCurrent()
{
if (!is_start_)
return;
is_wait_ = false;
is_start_ = false;
Close();
m_queue.clear();
}
// 等待所有已提交的任务完成然后关闭线程池
void ShutDownWaitAll()
{
if (!is_start_)
return;
is_wait_ = true;
is_start_ = false;
Close();
m_queue.clear();
}
// 向池子中提交一个待执行函数
template <typename F, typename... Args>
auto Submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
{
// 1. 创建一个绑定参数的函数,
// 连接函数和参数定义,特殊函数类型,避免左右值错误
std::function<decltype(f(args...))()> func =
std::bind(std::forward<F>(f), std::forward<Args>(args)...);
// 2. 使用智能指针以便调用拷贝构造
auto pTask =
std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
// 3. 包装进一个无参数函数
std::function<void()> rf = [pTask]() { (*pTask)(); };
// 4. 入队
m_queue.Push(rf);
// 5. 唤醒一个等待中的线程
cv_.notify_one();
// 6. 返回先前注册的任务指针
return pTask->get_future();
}
private:
void Close()
{
cv_.notify_all();
for (auto& m_thread : threads_) {
if (m_thread.joinable())
m_thread.join();
}
threads_.clear();
}
private:
int th_cnt_; // 线程数
bool is_start_; // 线程池是否启动
bool is_wait_; // 是否等待所有线程结束
std::mutex mutex_; // 线程休眠锁互斥
CQueueMt<std::function<void()>> m_queue;
std::vector<std::thread> threads_;
std::condition_variable cv_; // 环境锁,用于休眠或者唤醒
};
} // namespace cppbox
#endif