diff --git a/README.md b/README.md index e8145a3..efccb92 100644 --- a/README.md +++ b/README.md @@ -24,10 +24,6 @@ mark == 1 表示,服务客户端数据。 - 非`transm`客户端链接到服务器,发送不能识别的数据包格式,将导致`buffer`无限增加。 -- 服务端如果最大上载速度下载速度(较慢)不一致,将导致大量数据堆积在`Server`端内存等待`Client`端缓慢处理。 - -- 如果`client`端在传输数据的过程中异常关闭,`Server`端需要丢弃与该客户端相关的数据包。 - # 注意 -- 如果两个`transmc`客户端在同一台机器上同时收发同一个文件将导致文件丢失损坏。 \ No newline at end of file +- 如果两个`transmc`客户端在同一台机器上同时收发同一个文件将导致文件丢失损坏。 diff --git a/client/client.cpp b/client/client.cpp index a094bbc..78211d7 100644 --- a/client/client.cpp +++ b/client/client.cpp @@ -107,6 +107,10 @@ bool CClient::get_task_list() bool CClient::down_task(const std::string& param) { + if (downloading_) { + logger_->warn("Have Task Downloading, Please wait....."); + return false; + } int id = std::stoi(param); if (!task_list_.count(id)) { logger_->error("No matched id[{}] in task list.", id); @@ -212,6 +216,10 @@ void CClient::report_trans_ret(TransState state, const std::string& key) std::shared_ptr t = nullptr; if (key.empty()) { t = down_; + downloading_ = false; + if (th_down_active_.joinable()) { + th_down_active_.join(); + } } else { std::lock_guard lock(mutex_); if (up_.count(key)) { @@ -243,7 +251,6 @@ bool CClient::send_frame(CFrameBuffer* buf) } std::lock_guard lock(send_mut_); if (!client_->send(out_buf, out_len)) { - logger_->error("{} send failed.", __FUNCTION__); delete[] out_buf; return false; } @@ -300,6 +307,10 @@ void CClient::handle_frame(CFrameBuffer* buf) } // 能接收到 TRANS 一定是客户端(这里不是指Server) case TYPE_TRANS_FILE: { + if (!downloading_) { + downloading_ = true; + th_down_active_ = std::thread([&]() { judget_down_active(); }); + } auto ws = fwrite(buf->data_, 1, buf->len_, down_->file_); if (static_cast(ws) != buf->len_) { logger_->warn("no matched write and data."); @@ -340,9 +351,18 @@ void CClient::handle_frame(CFrameBuffer* buf) } case TYPE_OFFLINE: { if (buf->mark_) { - report_trans_ret(TRANS_FAILED, buf->fid_); + std::lock_guard lock(mutex_); + if (!up_.count(buf->fid_)) { + logger_->warn("Offline no match."); + break; + } + auto t = up_[buf->fid_]; + t->trans_state_ = TRANS_BREAK; break; } + if (!down_->cur_remote_file_.empty()) { + logger_->warn("Stop Down {} From {}.", down_->cur_remote_file_, buf->fid_); + } report_trans_ret(TRANS_FAILED); break; } @@ -372,12 +392,18 @@ void CClient::send_file_data_th(const char* keys) buf->data_ = new char[g_BuffSize]{}; buf->mark_ = 1; while (!feof(t->file_)) { + if (t->trans_state_ == TRANS_BREAK) { + logger_->warn("Stop Trans {} To {} failed.", t->cur_file_, str_key); + report_trans_ret(TRANS_FAILED, str_key); + return; + } buf->len_ = fread(buf->data_, 1, g_BuffSize, t->file_); if (!send_frame(buf.get())) { report_trans_ret(TRANS_FAILED, str_key); - logger_->error("send_file_data_th send failed."); + logger_->error("Stop Trans {} To {} failed.", t->cur_file_, str_key); return; } + //std::this_thread::sleep_for(std::chrono::milliseconds(10)); } buf->type_ = TYPE_TRANS_DONE; @@ -400,3 +426,14 @@ void CClient::hearts() } } } + +void CClient::judget_down_active() +{ + std::shared_ptr buf = std::make_shared(); + buf->type_ = TYPE_JUDGE_ACTIVE; + buf->tid_ = down_->cur_remote_id_; + while (downloading_ && th_run_) { + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + send_frame(buf.get()); + } +} diff --git a/client/client.h b/client/client.h index 7e52520..370458c 100644 --- a/client/client.h +++ b/client/client.h @@ -18,7 +18,8 @@ enum TransState { TRANS_FAILED, TRANS_ING, TRANS_REDAY, - TRANS_DONE + TRANS_DONE, + TRANS_BREAK }; struct TransInfomation { @@ -53,6 +54,7 @@ private: void handle_frame(CFrameBuffer* buf); void send_file_data_th(const char* keys); void hearts(); + void judget_down_active(); private: std::mutex mutex_; @@ -61,6 +63,7 @@ private: std::thread hearts_; CThreadSleep sleep_; bool th_run_{false}; + bool downloading_{false}; std::shared_ptr logger_; asio::io_context io_context_; std::shared_ptr client_; @@ -69,4 +72,5 @@ private: std::shared_ptr down_; std::vector ths_; std::map> up_; + std::thread th_down_active_; }; \ No newline at end of file diff --git a/server/server.cpp b/server/server.cpp index 6cd8f1c..dcd68a6 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -126,6 +126,18 @@ void CTcpServer::trans_data(CFrameBuffer* buf) } break; } + case TYPE_JUDGE_ACTIVE: { + if (fcli && tcli) { + break; + } + if (fcli && tcli == nullptr) { + buf->type_ = TYPE_OFFLINE; + std::swap(buf->fid_, buf->tid_); + send_frame(fcli->socket_, buf); + break; + } + break; + } // 两边发送OPEN case TYPE_OPEN_FILE: case TYPE_TRANS_DONE: diff --git a/util/util.h b/util/util.h index c9c431a..e24c38c 100644 --- a/util/util.h +++ b/util/util.h @@ -20,7 +20,8 @@ enum FrameType : int16_t { TYPE_NO_HIT_TASK, TYPE_WAITTING, TYPE_HEARTS, - TYPE_OFFLINE + TYPE_OFFLINE, + TYPE_JUDGE_ACTIVE }; using namespace ofen;