debug:调试通过,待继续深度测试。

This commit is contained in:
taynpg 2024-12-18 22:04:54 +08:00
parent 7ae8a6cf70
commit 6816a9d6a8
5 changed files with 60 additions and 10 deletions

View File

@ -24,10 +24,6 @@ mark == 1 表示,服务客户端数据。
- 非`transm`客户端链接到服务器,发送不能识别的数据包格式,将导致`buffer`无限增加。 - 非`transm`客户端链接到服务器,发送不能识别的数据包格式,将导致`buffer`无限增加。
- 服务端如果最大上载速度下载速度(较慢)不一致,将导致大量数据堆积在`Server`端内存等待`Client`端缓慢处理。
- 如果`client`端在传输数据的过程中异常关闭,`Server`端需要丢弃与该客户端相关的数据包。
# 注意 # 注意
- 如果两个`transmc`客户端在同一台机器上同时收发同一个文件将导致文件丢失损坏。 - 如果两个`transmc`客户端在同一台机器上同时收发同一个文件将导致文件丢失损坏。

View File

@ -107,6 +107,10 @@ bool CClient::get_task_list()
bool CClient::down_task(const std::string& param) bool CClient::down_task(const std::string& param)
{ {
if (downloading_) {
logger_->warn("Have Task Downloading, Please wait.....");
return false;
}
int id = std::stoi(param); int id = std::stoi(param);
if (!task_list_.count(id)) { if (!task_list_.count(id)) {
logger_->error("No matched id[{}] in task list.", id); logger_->error("No matched id[{}] in task list.", id);
@ -212,6 +216,10 @@ void CClient::report_trans_ret(TransState state, const std::string& key)
std::shared_ptr<TransInfomation> t = nullptr; std::shared_ptr<TransInfomation> t = nullptr;
if (key.empty()) { if (key.empty()) {
t = down_; t = down_;
downloading_ = false;
if (th_down_active_.joinable()) {
th_down_active_.join();
}
} else { } else {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (up_.count(key)) { if (up_.count(key)) {
@ -243,7 +251,6 @@ bool CClient::send_frame(CFrameBuffer* buf)
} }
std::lock_guard<std::mutex> lock(send_mut_); std::lock_guard<std::mutex> lock(send_mut_);
if (!client_->send(out_buf, out_len)) { if (!client_->send(out_buf, out_len)) {
logger_->error("{} send failed.", __FUNCTION__);
delete[] out_buf; delete[] out_buf;
return false; return false;
} }
@ -300,6 +307,10 @@ void CClient::handle_frame(CFrameBuffer* buf)
} }
// 能接收到 TRANS 一定是客户端(这里不是指Server) // 能接收到 TRANS 一定是客户端(这里不是指Server)
case TYPE_TRANS_FILE: { case TYPE_TRANS_FILE: {
if (!downloading_) {
downloading_ = true;
th_down_active_ = std::thread([&]() { judget_down_active(); });
}
auto ws = fwrite(buf->data_, 1, buf->len_, down_->file_); auto ws = fwrite(buf->data_, 1, buf->len_, down_->file_);
if (static_cast<int>(ws) != buf->len_) { if (static_cast<int>(ws) != buf->len_) {
logger_->warn("no matched write and data."); logger_->warn("no matched write and data.");
@ -340,9 +351,18 @@ void CClient::handle_frame(CFrameBuffer* buf)
} }
case TYPE_OFFLINE: { case TYPE_OFFLINE: {
if (buf->mark_) { if (buf->mark_) {
report_trans_ret(TRANS_FAILED, buf->fid_); std::lock_guard<std::mutex> lock(mutex_);
if (!up_.count(buf->fid_)) {
logger_->warn("Offline no match.");
break; break;
} }
auto t = up_[buf->fid_];
t->trans_state_ = TRANS_BREAK;
break;
}
if (!down_->cur_remote_file_.empty()) {
logger_->warn("Stop Down {} From {}.", down_->cur_remote_file_, buf->fid_);
}
report_trans_ret(TRANS_FAILED); report_trans_ret(TRANS_FAILED);
break; break;
} }
@ -372,12 +392,18 @@ void CClient::send_file_data_th(const char* keys)
buf->data_ = new char[g_BuffSize]{}; buf->data_ = new char[g_BuffSize]{};
buf->mark_ = 1; buf->mark_ = 1;
while (!feof(t->file_)) { while (!feof(t->file_)) {
if (t->trans_state_ == TRANS_BREAK) {
logger_->warn("Stop Trans {} To {} failed.", t->cur_file_, str_key);
report_trans_ret(TRANS_FAILED, str_key);
return;
}
buf->len_ = fread(buf->data_, 1, g_BuffSize, t->file_); buf->len_ = fread(buf->data_, 1, g_BuffSize, t->file_);
if (!send_frame(buf.get())) { if (!send_frame(buf.get())) {
report_trans_ret(TRANS_FAILED, str_key); report_trans_ret(TRANS_FAILED, str_key);
logger_->error("send_file_data_th send failed."); logger_->error("Stop Trans {} To {} failed.", t->cur_file_, str_key);
return; return;
} }
//std::this_thread::sleep_for(std::chrono::milliseconds(10));
} }
buf->type_ = TYPE_TRANS_DONE; buf->type_ = TYPE_TRANS_DONE;
@ -400,3 +426,14 @@ void CClient::hearts()
} }
} }
} }
void CClient::judget_down_active()
{
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->type_ = TYPE_JUDGE_ACTIVE;
buf->tid_ = down_->cur_remote_id_;
while (downloading_ && th_run_) {
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
send_frame(buf.get());
}
}

View File

@ -18,7 +18,8 @@ enum TransState {
TRANS_FAILED, TRANS_FAILED,
TRANS_ING, TRANS_ING,
TRANS_REDAY, TRANS_REDAY,
TRANS_DONE TRANS_DONE,
TRANS_BREAK
}; };
struct TransInfomation { struct TransInfomation {
@ -53,6 +54,7 @@ private:
void handle_frame(CFrameBuffer* buf); void handle_frame(CFrameBuffer* buf);
void send_file_data_th(const char* keys); void send_file_data_th(const char* keys);
void hearts(); void hearts();
void judget_down_active();
private: private:
std::mutex mutex_; std::mutex mutex_;
@ -61,6 +63,7 @@ private:
std::thread hearts_; std::thread hearts_;
CThreadSleep sleep_; CThreadSleep sleep_;
bool th_run_{false}; bool th_run_{false};
bool downloading_{false};
std::shared_ptr<spdlog::logger> logger_; std::shared_ptr<spdlog::logger> logger_;
asio::io_context io_context_; asio::io_context io_context_;
std::shared_ptr<CTcpClient> client_; std::shared_ptr<CTcpClient> client_;
@ -69,4 +72,5 @@ private:
std::shared_ptr<TransInfomation> down_; std::shared_ptr<TransInfomation> down_;
std::vector<std::thread> ths_; std::vector<std::thread> ths_;
std::map<std::string, std::shared_ptr<TransInfomation>> up_; std::map<std::string, std::shared_ptr<TransInfomation>> up_;
std::thread th_down_active_;
}; };

View File

@ -126,6 +126,18 @@ void CTcpServer::trans_data(CFrameBuffer* buf)
} }
break; break;
} }
case TYPE_JUDGE_ACTIVE: {
if (fcli && tcli) {
break;
}
if (fcli && tcli == nullptr) {
buf->type_ = TYPE_OFFLINE;
std::swap(buf->fid_, buf->tid_);
send_frame(fcli->socket_, buf);
break;
}
break;
}
// 两边发送OPEN // 两边发送OPEN
case TYPE_OPEN_FILE: case TYPE_OPEN_FILE:
case TYPE_TRANS_DONE: case TYPE_TRANS_DONE:

View File

@ -20,7 +20,8 @@ enum FrameType : int16_t {
TYPE_NO_HIT_TASK, TYPE_NO_HIT_TASK,
TYPE_WAITTING, TYPE_WAITTING,
TYPE_HEARTS, TYPE_HEARTS,
TYPE_OFFLINE TYPE_OFFLINE,
TYPE_JUDGE_ACTIVE
}; };
using namespace ofen; using namespace ofen;