From 44fb416ca166aa9c4ce270ff71bea6b3df703d6e Mon Sep 17 00:00:00 2001 From: taynpg Date: Fri, 13 Dec 2024 12:29:47 +0800 Subject: [PATCH] =?UTF-8?q?add=EF=BC=9A=E6=B7=BB=E5=8A=A0=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=B1=A0=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/of_util.h | 178 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 176 insertions(+), 2 deletions(-) diff --git a/include/of_util.h b/include/of_util.h index 701d815..f072b2d 100644 --- a/include/of_util.h +++ b/include/of_util.h @@ -1,12 +1,17 @@ #pragma once #include "of_def.hpp" +#include #include +#include +#include +#include +#include #include #include #include +#include +#include #include -#include -#include namespace ofen { template class OfSingleton @@ -37,6 +42,7 @@ class CMutBuffer { public: CMutBuffer() = default; + public: void push(const char* data, int len); int index_of(const char* data, int len, int start_pos = 0); @@ -44,9 +50,177 @@ public: int get_len() const; void remove_of(int start_pos, int len); void clear(); + private: std::vector buffer_; std::mutex mutex_; }; +template class CQueueMt +{ +public: + CQueueMt() = default; + CQueueMt(CQueueMt&& rh) noexcept + { + } + ~CQueueMt() = default; + +public: + bool empty() + { + std::unique_lock lock(mutex_); + return queue_.empty(); + } + size_t size() + { + std::unique_lock lock(mutex_); + return queue_.size(); + } + void push(T& t) + { + std::unique_lock lock(mutex_); + queue_.emplace(t); + } + bool pop(T& t) + { + std::unique_lock lock(mutex_); + if (queue_.empty()) + return false; + t = std::move(queue_.front()); + queue_.pop(); + return true; + } + void clear() + { + std::unique_lock lock(mutex_); + std::queue queue; + std::swap(queue_, queue); + } + +private: + std::mutex mutex_; + std::queue queue_; +}; + +class CThreadPool +{ +private: + class CWorker + { + public: + CWorker(CThreadPool* pool, const size_t id) : id_(id), pool_(pool) + { + is_run_ = false; + } + void operator()() + { + std::function func; + bool have = false; + while (pool_->is_start_ || !is_run_ || is_continue()) { + is_run_ = true; + { + std::unique_lock lock(pool_->mutex_); + if (pool_->queue_.empty()) + pool_->cv_.wait(lock); + have = pool_->queue_.pop(func); + } + + if (have) + func(); + } + } + bool is_continue() + { + if (!pool_->is_wait_) + return false; + if (pool_->queue_.empty()) + return false; + return true; + } + + private: + size_t id_; + CThreadPool* pool_; + bool is_run_; + }; + +public: + explicit CThreadPool(const int num = 4) + : th_cnt_(num), is_start_(false), is_wait_(false), threads_(std::vector(num)) + { + } + ~CThreadPool() + { + close_wait_current(); + } + CThreadPool(const CThreadPool&) = delete; + CThreadPool(CThreadPool&&) = delete; + CThreadPool& operator=(const CThreadPool&) = delete; + CThreadPool& operator=(CThreadPool&&) = delete; + +public: + // 初始化线程池 + void init() + { + if (!is_start_) { + threads_.resize(th_cnt_); + for (size_t i = 0; i < threads_.size(); ++i) { + threads_.at(i) = std::thread(CWorker(this, i)); + } + is_start_ = true; + is_wait_ = false; + return; + } + } + void close_wait_current() + { + if (!is_start_) + return; + is_wait_ = false; + is_start_ = false; + close(); + queue_.clear(); + } + void close_wait_all() + { + if (!is_start_) + return; + is_wait_ = true; + is_start_ = false; + close(); + queue_.clear(); + } + template auto submit(F&& f, Args&&... args) -> std::future + { + std::function func = std::bind(std::forward(f), std::forward(args)...); + auto pTask = std::make_shared>(func); + std::function rf = [pTask]() { (*pTask)(); }; + queue_.push(rf); + cv_.notify_one(); + return pTask->get_future(); + } + +private: + void close() + { + cv_.notify_all(); + + for (auto& m_thread : threads_) { + if (m_thread.joinable()) + m_thread.join(); + } + threads_.clear(); + } + +private: + int th_cnt_; + bool is_start_; + bool is_wait_; + std::mutex mutex_; + + CQueueMt> queue_; + std::vector threads_; + std::condition_variable cv_; +}; + } // namespace ofen