From 2cdeb965c37d3d11c8691edd9405d361673923cd Mon Sep 17 00:00:00 2001 From: taynpg Date: Wed, 18 Dec 2024 16:55:32 +0800 Subject: [PATCH] =?UTF-8?q?change=EF=BC=9A=E6=9B=B4=E6=94=B9=E4=B8=AD?= =?UTF-8?q?=E8=BD=AC=E6=96=B9=E5=BC=8F=EF=BC=88=E6=9A=82=E6=9C=AA=E8=B0=83?= =?UTF-8?q?=E9=80=9A=EF=BC=89=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/client.cpp | 65 +++++++++-------- client/client.h | 12 ++-- server/server.cpp | 174 +++++++++++++++++++++------------------------- server/server.h | 16 ++--- util/util.h | 4 +- 5 files changed, 126 insertions(+), 145 deletions(-) diff --git a/client/client.cpp b/client/client.cpp index 920758e..a10c0a4 100644 --- a/client/client.cpp +++ b/client/client.cpp @@ -174,6 +174,11 @@ bool CClient::down_one_file(const std::string& id, const std::string& file) fs::path remote_file(ofen::COfPath::normalize(down_->cur_remote_file_)); down_->cur_file_ = COfPath::to_full(remote_file.filename().string()); logger_->warn("Start Down => {} To {}", down_->cur_remote_file_, down_->cur_file_); + down_->file_ = fopen(down_->cur_file_.c_str(), "wb"); + if (down_->file_ == nullptr) { + logger_->error("Open {} Failed.", down_->cur_file_); + return false; + } // 请求下载文件 std::shared_ptr buf = std::make_shared(); @@ -290,15 +295,6 @@ void CClient::handle_frame(CFrameBuffer* buf) } break; } - // (客户服务都不是指Server)如果是客户端啥也不干,服务端则开始发送数据 - case TYPE_READY_TRANS: { - if (buf->mark_ != 0) { - break; - } - work_key_ = buf->fid_; - send_pool_->submit([&]() { send_file_data_th(); }); - break; - } // 能接收到 TRANS 一定是客户端(这里不是指Server) case TYPE_TRANS_FILE: { auto ws = fwrite(buf->data_, 1, buf->len_, down_->file_); @@ -308,34 +304,25 @@ void CClient::handle_frame(CFrameBuffer* buf) break; } case TYPE_OPEN_FILE: { - std::shared_ptr t = nullptr; - std::string id = buf->fid_; - if (buf->mark_ == 0) { + char* keys = nullptr; + { std::lock_guard lock(mutex_); up_[buf->fid_] = std::make_shared(); - t = up_[buf->fid_]; #ifdef _WIN32 - t->cur_file_ = CCodec::u8ToGBK(std::string(buf->data_, buf->len_)); + up_[buf->fid_]->cur_file_ = CCodec::u8ToGBK(std::string(buf->data_, buf->len_)); #else - t->cur_file_ = std::string(buf->data_, buf->len_); + up_[buf->fid_]->cur_file_ = std::string(buf->data_, buf->len_); #endif - t->file_ = fopen(t->cur_file_.c_str(), "rb"); - } else { - t = down_; - t->file_ = fopen(t->cur_file_.c_str(), "wb"); + up_[buf->fid_]->file_ = fopen(up_[buf->fid_]->cur_file_.c_str(), "rb"); + if (up_[buf->fid_]->file_ == nullptr) { + logger_->error("Ready Send File {} Open Failed.", up_[buf->fid_]->cur_file_); + break; + } + keys = new char[512](); + std::snprintf(keys, 512, "%s", buf->fid_.c_str()); } - if (t->file_ == nullptr) { - logger_->error("open file {} failed.", t->cur_file_); - report_trans_ret(TRANS_FAILED, buf->mark_ == 0 ? id : ""); - break; - } - std::shared_ptr tmp = std::make_shared(); - tmp->type_ = TYPE_READY_TRANS; - tmp->mark_ = buf->mark_ == 0 ? 1 : 0; - tmp->tid_ = buf->fid_; - if (!send_frame(tmp.get())) { - logger_->error("TYPE_OPEN_FILE send ready failed."); - report_trans_ret(TRANS_FAILED, buf->mark_ == 0 ? id : ""); + if (keys) { + send_pool_->submit([&]() { send_file_data_th(keys); }); } break; } @@ -348,15 +335,27 @@ void CClient::handle_frame(CFrameBuffer* buf) down_->trans_state_ = TRANS_DONE; break; } + case TYPE_OFFLINE: { + if (buf->mark_) { + report_trans_ret(TRANS_FAILED, buf->fid_); + break; + } + report_trans_ret(TRANS_FAILED); + break; + } default: break; } } -void CClient::send_file_data_th() +void CClient::send_file_data_th(char* keys) { - std::string str_key = work_key_; + std::string str_key(keys); std::shared_ptr t = nullptr; + std::shared_ptr deleter(new int(0), [&](int* p) { + delete p; + delete[] keys; + }); { std::lock_guard lock(mutex_); diff --git a/client/client.h b/client/client.h index e1ae811..1531ac1 100644 --- a/client/client.h +++ b/client/client.h @@ -1,12 +1,12 @@ #pragma once +#include "file_oper.h" +#include +#include #include +#include +#include #include #include -#include -#include -#include "file_oper.h" -#include -#include using namespace ofen; struct DownClientInfo { @@ -51,7 +51,7 @@ private: private: void handle_frame(CFrameBuffer* buf); - void send_file_data_th(); + void send_file_data_th(char* keys); void hearts(); private: diff --git a/server/server.cpp b/server/server.cpp index 5f9ad2c..37f0771 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -4,21 +4,15 @@ using namespace ofen; constexpr int g_MaxCacheLen = 1024 * 1024 * 50; -constexpr int g_ParseThreadNum = 1; CTcpServer::CTcpServer(asio::io_context& io_context, const std::shared_ptr& logger) : io_context_(io_context), logger_(logger), acceptor_(io_context) { th_run_ = true; - handle_pool_ = std::make_shared(g_ParseThreadNum); - handle_pool_->init(); - for (int i = 0; i < g_ParseThreadNum; ++i) { - handle_pool_->submit([&]() { handle_frame(); }); - } } + CTcpServer::~CTcpServer() { th_run_ = false; - handle_pool_->close_wait_all(); } bool CTcpServer::start(unsigned short port) @@ -86,98 +80,85 @@ void CTcpServer::get_client_list(CFrameBuffer** buf) tbuf->len_ = std::snprintf(tbuf->data_, msg.size() + 1, "%s", msg.data()); } -void CTcpServer::handle_frame() +void CTcpServer::trans_data(CFrameBuffer* buf) { - CFrameBuffer* buf = nullptr; - while (th_run_) { - { - std::lock_guard lock(buf_mut_); - if (!cache_.empty()) { - buf = cache_.front(); - cache_.pop_front(); - } - } - if (!buf) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - continue; - } + std::shared_ptr fcli = nullptr; + std::shared_ptr tcli = nullptr; - switch (buf->type_) { - case TYPE_GET_LIST: { - logger_->info("[{}] GetList.", buf->fid_); - get_client_list(&buf); - std::lock_guard lock(cli_mut_); - if (client_map_.count(buf->fid_)) { - auto& cli = client_map_[buf->fid_]; - if (!send_frame(cli->socket_, buf)) { - logger_->error("GetList send failed."); - } - } - break; + { + std::lock_guard lock(cli_mut_); + if (client_map_.count(buf->fid_)) { + fcli = client_map_[buf->fid_]; } - case TYPE_UP_LIST: { - std::string files_path = std::string(buf->data_, buf->len_); -#ifdef _WIN32 - std::string turn_files_path = CCodec::u8ToGBK(files_path); -#else - std::string turn_files_path(files_path); -#endif - logger_->info("[{}] UpList. {}", buf->fid_, turn_files_path); - std::lock_guard lock(cli_mut_); - if (client_map_.count(buf->fid_)) { - auto& cli = client_map_[buf->fid_]; - cli->task_ = files_path; - cli->time_ = OfUtil::now_time(); - } - break; + if (client_map_.count(buf->tid_)) { + tcli = client_map_[buf->tid_]; } - case TYPE_CANCEL_LIST: { - logger_->info("[{}] Cancle Task.", buf->fid_); - std::lock_guard lock(cli_mut_); - if (client_map_.count(buf->fid_)) { - auto& cli = client_map_[buf->fid_]; - cli->task_.clear(); - } - break; - } - // 两边发送OPEN - case TYPE_OPEN_FILE: { - std::lock_guard lock(cli_mut_); - if (client_map_.count(buf->tid_)) { - auto& cli = client_map_[buf->tid_]; - if (!send_frame(cli->socket_, buf)) { - logger_->error("[{}] turn tid_ ailed to {}", buf->fid_, buf->tid_); - } - } - if (client_map_.count(buf->fid_)) { - auto& cli = client_map_[buf->fid_]; - buf->mark_ = 1; - buf->fid_ = buf->tid_; - if (!send_frame(cli->socket_, buf)) { - logger_->error("[{}] turn fid_ failed to {}", buf->fid_, buf->tid_); - } - } - break; - }; - case TYPE_TRANS_DONE: - case TYPE_READY_TRANS: - case TYPE_TRANS_FILE: { - std::lock_guard lock(cli_mut_); - if (client_map_.count(buf->tid_)) { - auto& cli = client_map_[buf->tid_]; - if (!send_frame(cli->socket_, buf)) { - logger_->error("[{}] turn failed to {}", buf->fid_, buf->tid_); - } - } - break; - } - default: - logger_->warn("No Mathched type."); - break; - } - delete buf; - buf = nullptr; } + + switch (buf->type_) { + case TYPE_GET_LIST: { + logger_->info("[{}] GetList.", buf->fid_); + get_client_list(&buf); + if (fcli && !send_frame(fcli->socket_, buf)) { + logger_->error("GetList send failed."); + } + break; + } + case TYPE_UP_LIST: { + std::string files_path = std::string(buf->data_, buf->len_); +#ifdef _WIN32 + std::string turn_files_path = CCodec::u8ToGBK(files_path); +#else + std::string turn_files_path(files_path); +#endif + logger_->info("[{}] UpList. {}", buf->fid_, turn_files_path); + if (fcli) { + fcli->task_ = files_path; + fcli->time_ = OfUtil::now_time(); + } + break; + } + case TYPE_CANCEL_LIST: { + logger_->info("[{}] Cancle Task.", buf->fid_); + if (fcli) { + fcli->task_.clear(); + fcli->time_.clear(); + } + break; + } + // 两边发送OPEN + case TYPE_OPEN_FILE: + case TYPE_TRANS_DONE: + case TYPE_TRANS_FILE: { + if (check_double(fcli, tcli, buf) && !send_frame(tcli->socket_, buf)) { + logger_->error("Send from {} to {} failed Or One Offline.", buf->fid_, buf->tid_); + } + break; + } + default: + logger_->warn("No Mathched type."); + break; + } +} + +bool CTcpServer::check_double(std::shared_ptr& fcli, std::shared_ptr& tcli, + CFrameBuffer* buf) +{ + if (fcli == nullptr && tcli) { + buf->type_ = TYPE_OFFLINE; + send_frame(tcli->socket_, buf); + return false; + } + if (tcli == nullptr && fcli) { + std::swap(buf->fid_, buf->tid_); + buf->type_ = TYPE_OFFLINE; + send_frame(fcli->socket_, buf); + return false; + } + if (tcli == nullptr && fcli == nullptr) { + return false; + } + return true; } void CTcpServer::accept_client() @@ -246,8 +227,9 @@ void CTcpServer::th_client(std::shared_ptr socket, const continue; } frame->fid_ = client_key; - std::lock_guard lock(buf_mut_); - cache_.push_back(frame); + // 直接转发,不加入缓存。 + trans_data(frame); + delete frame; continue; } break; diff --git a/server/server.h b/server/server.h index ad9a267..76f02b5 100644 --- a/server/server.h +++ b/server/server.h @@ -1,7 +1,7 @@ #pragma once +#include #include #include -#include #include #include #include @@ -36,16 +36,18 @@ private: void get_client_list(CFrameBuffer** buf); private: - void handle_frame(); + void trans_data(CFrameBuffer* buf); + bool check_double(std::shared_ptr& fcli, std::shared_ptr& tcli, + CFrameBuffer* buf); private: void accept_client(); void th_client(std::shared_ptr socket, const std::string& client_key); - /// @brief 不删除 buf - /// @param socket - /// @param buf - /// @return + /// @brief 不删除 buf + /// @param socket + /// @param buf + /// @return bool send_frame(std::shared_ptr socket, CFrameBuffer* buf); private: @@ -56,8 +58,6 @@ private: std::map> client_map_; std::map client_threads_; std::mutex cli_mut_; - std::list cache_; std::mutex buf_mut_; - std::shared_ptr handle_pool_; std::string server_ip_; }; \ No newline at end of file diff --git a/util/util.h b/util/util.h index 3f61407..c9c431a 100644 --- a/util/util.h +++ b/util/util.h @@ -16,11 +16,11 @@ enum FrameType : int16_t { TYPE_OPEN_FILE, TYPE_TRANS_FILE, TYPE_TRANS_DONE, - TYPE_READY_TRANS, TYPE_INTERRUPT, TYPE_NO_HIT_TASK, TYPE_WAITTING, - TYPE_HEARTS + TYPE_HEARTS, + TYPE_OFFLINE }; using namespace ofen;