From 3a68984857fd2f272ec50ffe8569ba9179291e42 Mon Sep 17 00:00:00 2001 From: taynpg Date: Wed, 18 Dec 2024 13:55:56 +0800 Subject: [PATCH] =?UTF-8?q?add=EF=BC=9A=E6=B7=BB=E5=8A=A0=E5=BF=83?= =?UTF-8?q?=E8=B7=B3=E6=9C=BA=E5=88=B6=EF=BC=8C=E6=A3=80=E6=B5=8B=E4=BC=A0?= =?UTF-8?q?=E8=BE=93=E5=A4=B1=E8=B4=A5=E6=97=B6=E8=87=AA=E5=8A=A8=E5=8F=91?= =?UTF-8?q?=E7=8E=B0=E6=8F=90=E7=A4=BA=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/client.cpp | 37 +++++++++++++++++++++++++++++++++++-- client/client.h | 6 ++++++ ofen | 2 +- server/server.cpp | 4 ++++ util/util.h | 3 ++- 5 files changed, 48 insertions(+), 4 deletions(-) diff --git a/client/client.cpp b/client/client.cpp index b43fd0b..920758e 100644 --- a/client/client.cpp +++ b/client/client.cpp @@ -13,10 +13,13 @@ CClient::CClient(const std::shared_ptr& logger) : logger_(logger send_pool_ = std::make_shared(g_SendPoolNum); send_pool_->init(); supported_.push_back("Get"); + sleep_.set_timeout(2000); } CClient::~CClient() { + th_run_ = false; + sleep_.contiune(); if (down_->file_) { fclose(down_->file_); down_->file_ = nullptr; @@ -28,20 +31,30 @@ CClient::~CClient() item.second->file_ = nullptr; } } + if (hearts_.joinable()) { + hearts_.join(); + } } void CClient::run(const std::string& ip, const std::string& port) { + th_run_ = true; if (!client_->connect(ip, port)) { logger_->info("{} connect err.", __FUNCTION__); return; } client_->register_func([&](CFrameBuffer* buf) { handle_frame(buf); }); client_->async_recv(); + hearts_ = std::thread([&]() { hearts(); }); std::thread thread([&]() { io_context_.run(); }); logger_->warn("SupportCmd:Get|Up|Down|Cancel"); char line[512]{}; while (std::cin.getline(line, 512)) { + + if (!th_run_) { + break; + } + std::string cmd_input(line); if (cmd_input == "end") { break; @@ -101,7 +114,9 @@ bool CClient::down_task(const std::string& param) // 开始传输文件 for (const auto& item : vec) { - down_one_file(task_list_[id]->id, item); + if (!down_one_file(task_list_[id]->id, item)) { + break; + } } return true; } @@ -175,8 +190,12 @@ bool CClient::down_one_file(const std::string& id, const std::string& file) down_->trans_state_ = TRANS_REDAY; while (down_->trans_state_ != TRANS_DONE) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); + if (!th_run_) { + logger_->error("Interrup When Receive File."); + report_trans_ret(TRANS_FAILED); + return false; + } } - return true; } @@ -214,6 +233,7 @@ bool CClient::send_frame(CFrameBuffer* buf) logger_->error("{} pack failed.", __FUNCTION__); return false; } + std::lock_guard lock(send_mut_); if (!client_->send(out_buf, out_len)) { logger_->error("{} send failed.", __FUNCTION__); delete[] out_buf; @@ -369,3 +389,16 @@ void CClient::send_file_data_th() report_trans_ret(TRANS_DONE, str_key); logger_->debug("Trans File {} To {} Done !!!", t->cur_file_, str_key); } + +void CClient::hearts() +{ + std::shared_ptr buf = std::make_shared(); + buf->type_ = TYPE_HEARTS; + while (th_run_) { + sleep_.sleep(); + if (!send_frame(buf.get())) { + logger_->error("{} send failed.", __FUNCTION__); + th_run_ = false; + } + } +} diff --git a/client/client.h b/client/client.h index 0670991..e1ae811 100644 --- a/client/client.h +++ b/client/client.h @@ -6,6 +6,7 @@ #include #include "file_oper.h" #include +#include using namespace ofen; struct DownClientInfo { @@ -51,6 +52,7 @@ private: private: void handle_frame(CFrameBuffer* buf); void send_file_data_th(); + void hearts(); private: std::shared_ptr logger_; @@ -61,6 +63,10 @@ private: std::shared_ptr down_; std::map> up_; std::mutex mutex_; + std::mutex send_mut_; std::shared_ptr send_pool_; std::string work_key_; + std::thread hearts_; + CThreadSleep sleep_; + bool th_run_{false}; }; \ No newline at end of file diff --git a/ofen b/ofen index ab87624..cf6168a 160000 --- a/ofen +++ b/ofen @@ -1 +1 @@ -Subproject commit ab87624a333e385e08abb82ac623bce8da798539 +Subproject commit cf6168a89a784521df0c9d63a049bc025daa1b85 diff --git a/server/server.cpp b/server/server.cpp index 93dcfa2..5f9ad2c 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -241,6 +241,10 @@ void CTcpServer::th_client(std::shared_ptr socket, const while (true) { auto* frame = CTransProtocal::parse(cache->buffer_); if (frame) { + if (frame->type_ == TYPE_HEARTS) { + delete frame; + continue; + } frame->fid_ = client_key; std::lock_guard lock(buf_mut_); cache_.push_back(frame); diff --git a/util/util.h b/util/util.h index b46832c..3f61407 100644 --- a/util/util.h +++ b/util/util.h @@ -19,7 +19,8 @@ enum FrameType : int16_t { TYPE_READY_TRANS, TYPE_INTERRUPT, TYPE_NO_HIT_TASK, - TYPE_WAITTING + TYPE_WAITTING, + TYPE_HEARTS }; using namespace ofen;