diff --git a/CMakeLists.txt b/CMakeLists.txt index 1e8b489..f265ff9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,7 +23,7 @@ add_subdirectory(util) add_subdirectory(server) add_subdirectory(client) -add_executable(transm_test1 test1.cpp) -target_link_libraries(transm_test1 PRIVATE trans_net trans_util) -add_executable(transm_test2 test2.cpp) -target_link_libraries(transm_test2 PRIVATE trans_net trans_util) \ No newline at end of file +# add_executable(transm_test1 test1.cpp) +# target_link_libraries(transm_test1 PRIVATE trans_net trans_util) +# add_executable(transm_test2 test2.cpp) +# target_link_libraries(transm_test2 PRIVATE trans_net trans_util) \ No newline at end of file diff --git a/README.md b/README.md index 30cb470..c4b1fc4 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # transm -以一个网点作为中转传输文件。 +以一个简易的使用`Server`作为中转传输文件。 # 简要说明 @@ -8,46 +8,6 @@ - `client`从`server`下载文件,如果本地有重复则覆盖。 - 工作方式为`client A`端提交待传输的文件列表到`server`,`client B`端从`server`获取有哪些客户机提交的哪些任务,可以从中下载。 -## 格式(开发用) +mark == 0 表示,请求下载端的数据。 -通讯协议中的`DATA`部分,对于`type`为`1`的类型来讲,统一以下格式(`command`和`param`内容中不能含有`|`): - -**command|param|data** (传输格式) - -**command(空格)param1,param2,param3..** (cmd输入格式) - -### 1.命令 - -`type`:1 - -**Get:** 获取当前挂载到服务器的任务单。 - -**DownTask:** 下载指定的任务清单,`param`为`Get`中列出的名称。 - -**UpTask:** 上载任务单,`param`为文件或者文件夹路径,可多个,使用`,`分隔。 - -### 2.数据 - -`type`:2 - -当`mark`为`0`时表示数据的最后一包,其他数据表示非最后一包。 - -`type`: 199,表示询问在线客户端及挂载任务。 - -`type`: 198,下载清单文件。 - -`type`: 197,上载清单。 - -`type`: 196,取消上载任务。 - -`type`: 195,请求打开文件,返回mark值为1表示OK,为0表示失败。 - -`type`: 194,可以传输文件。 - -`type`: 193,文件数据。 - -`type`: 192,传输结束。 - -`type`:191,异常中断传输。 - -`type`:190,没有此清单。 +mark == 1 表示,服务客户端数据。 diff --git a/client/client.cpp b/client/client.cpp index ff8b6ce..ee31e4a 100644 --- a/client/client.cpp +++ b/client/client.cpp @@ -1,11 +1,16 @@ #include "client.h" +#include #include +#include #include -using namespace ofen; +namespace fs = std::filesystem; +constexpr int g_SendPoolNum = 1; CClient::CClient(const std::shared_ptr& logger) : logger_(logger) { client_ = std::make_shared(io_context_, logger_); + send_pool_ = std::make_shared(g_SendPoolNum); + send_pool_->init(); supported_.push_back("Get"); } @@ -47,12 +52,7 @@ void CClient::run() continue; } if (cmd == "Down") { - int key = param.empty() ? -1 : std::stoi(param); - if (task_list_.count(key)) { - down_task(); - } else { - logger_->error("no task number find."); - } + down_task(vec[1]); continue; } if (cmd == "Up") { @@ -77,10 +77,22 @@ bool CClient::get_task_list() return send_frame(buf.get()); } -bool CClient::down_task() +bool CClient::down_task(const std::string& param) { - // if (send_frame(198, )) - return false; + int id = std::stoi(param); + if (!task_list_.count(id)) { + logger_->error("No matched id[{}] in task list.", id); + return false; + } + down_ = std::make_shared(); + const auto& vec = task_list_[id]->files; + + // 开始传输文件 + for (const auto& item : vec) { + logger_->warn("Start Down => {}", item); + down_one_file(task_list_[id]->id, item); + } + return true; } bool CClient::up_task(const std::string& cmd) @@ -100,9 +112,9 @@ bool CClient::up_task(const std::string& cmd) } std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_UP_LIST; - buf->data_ = new char[msg.size()]; - std::memset(buf->data_, 0x0, msg.size()); - buf->len_ = std::snprintf(buf->data_, msg.size(), "%s", msg.data()); + buf->data_ = new char[msg.size() + 1]; + std::memset(buf->data_, 0x0, msg.size() + 1); + buf->len_ = std::snprintf(buf->data_, msg.size() + 1, "%s", msg.data()); return send_frame(buf.get()); } @@ -113,6 +125,56 @@ bool CClient::cancel_task() return send_frame(buf.get()); } +bool CClient::down_one_file(const std::string& id, const std::string& file) +{ + down_->cur_remote_id_ = id; + down_->cur_remote_file_ = file; + + fs::path remote_file(down_->cur_remote_file_); + down_->cur_file_ = COfPath::to_full(remote_file.filename().string()); + + // 请求下载文件 + std::shared_ptr buf = std::make_shared(); + buf->type_ = TYPE_OPEN_FILE; + buf->tid_ = id; + buf->data_ = new char[file.size() + 1]; + buf->len_ = std::snprintf(buf->data_, file.size() + 1, "%s", file.data()); + if (!send_frame(buf.get())) { + logger_->error("{} request open file [{}] send failed.", __FUNCTION__); + down_->cur_remote_id_.clear(); + down_->cur_remote_file_.clear(); + return false; + } + down_->trans_state_ = TRANS_REDAY; + return true; +} + +void CClient::cancel_trans_file(const std::string& key) +{ + std::shared_ptr t = nullptr; + if (key.empty()) { + t = down_; + } else { + if (up_.count(key)) { + t = up_[key]; + } + } + + if (t == nullptr) { + return; + } + + t->trans_state_ = TRANS_FAILE; + if (t->file_) { + fclose(t->file_); + if (key.empty()) { + fs::remove(t->cur_file_); + } + } + t->cur_remote_file_.clear(); + t->cur_remote_id_.clear(); +} + bool CClient::send_frame(CFrameBuffer* buf) { char* out_buf{}; @@ -136,27 +198,130 @@ void CClient::handle_frame(CFrameBuffer* buf) logger_->error("{} nullptr.", __FUNCTION__); return; } - // logger_->debug("type: {}", buf->type_); - // logger_->debug("len: {}", buf->len_); - if (buf->type_ == 199) { + auto t = static_cast(buf->type_); + switch (t) { + case TYPE_GET_LIST: { task_list_.clear(); std::string source(buf->data_); auto vec = COfStr::split(source, "\n"); + int index = -1; for (const auto& item : vec) { + if (item.empty()) { + continue; + } if (item.find("[") == std::string::npos) { logger_->info("FILE ==> {}", item); + task_list_[index]->files.push_back(item); } else { + auto a = item.find_first_of("[") + 1; + auto b = item.find_first_of("]"); + std::string str_index = item.substr(a, b - a); + index = std::stoi(str_index); + + std::string backup = item; + backup.erase(0, b + 1); + auto aa = backup.find_first_of("[") + 1; + auto bb = backup.find_first_of("]"); + std::string id = backup.substr(aa, bb - aa); + + if (!task_list_.count(index)) { + task_list_[index] = std::make_shared(); + task_list_[index]->id = id; + } + logger_->debug("***********************************************"); - logger_->debug("{}", item); + logger_->info("{}", item); } } - // int index = 0; - // auto vec = COfStr::split(source, "|"); - // for (const auto& item : vec) { - // task_list_[index] = item; - // ++index; - // logger_->warn("{}:{}", index, item); - // } + break; + } + // (客户服务都不是指Server)如果是客户端啥也不干,服务端则开始发送数据 + case TYPE_READY_TRANS: { + if (buf->mark_ != 0) { + break; + } + std::string* key = new std::string(); + key->append(buf->fid_); + send_pool_->submit([&]() { send_file_data_th(key); }); + break; + } + // 能接收到 TRANS 一定是客户端(这里不是指Server) + case TYPE_TRANS_FILE: { + auto ws = fwrite(buf->data_, 1, buf->len_, down_->file_); + if (static_cast(ws) != buf->len_) { + logger_->warn("no matched write and data."); + } + break; + } + case TYPE_OPEN_FILE: { + std::shared_ptr t = nullptr; + std::string id = buf->fid_; + if (buf->mark_ == 0) { + std::lock_guard lock(mutex_); + up_[buf->fid_] = std::make_shared(); + t = up_[buf->fid_]; + t->cur_file_ = std::string(buf->data_, buf->len_); + t->file_ = fopen(t->cur_file_.c_str(), "rb"); + } else { + t = down_; + t->file_ = fopen(t->cur_file_.c_str(), "wb"); + } + if (t->file_ == nullptr) { + logger_->error("open file {} failed.", t->cur_file_); + cancel_trans_file(buf->mark_ == 0 ? id : ""); + break; + } + std::shared_ptr tmp = std::make_shared(); + tmp->type_ = TYPE_READY_TRANS; + tmp->mark_ = buf->mark_ == 0 ? 1 : 0; + tmp->tid_ = buf->fid_; + if (!send_frame(tmp.get())) { + logger_->error("TYPE_OPEN_FILE send ready failed."); + cancel_trans_file(buf->mark_ == 0 ? id : ""); + } + break; + } + case TYPE_TRANS_DONE: { + fclose(down_->file_); + down_->trans_state_ = TRANS_DONE; + break; + } + default: + break; } } + +void CClient::send_file_data_th(std::string* key) +{ + std::string str_key = *key; + std::shared_ptr t = nullptr; + + { + std::lock_guard lock(mutex_); + if (!up_.count(str_key)) { + logger_->error("{} no matched key.", __FUNCTION__); + return; + } + t = up_[str_key]; + } + + std::shared_ptr buf = std::make_shared(); + buf->type_ = TYPE_TRANS_FILE; + buf->tid_ = str_key; + buf->data_ = new char[1024]{}; + while (!feof(t->file_)) { + buf->len_ = fread(buf->data_, 1, 1024, t->file_); + if (!send_frame(buf.get())) { + logger_->error("send_file_data_th send failed."); + delete key; + return; + } + } + + buf->type_ = TYPE_TRANS_DONE; + if (!send_frame(buf.get())) { + logger_->error("send_file_data_th send DONE failed."); + } + delete key; +} diff --git a/client/client.h b/client/client.h index 6660969..a6dcb47 100644 --- a/client/client.h +++ b/client/client.h @@ -2,8 +2,31 @@ #include #include #include +#include #include #include "file_oper.h" +#include + +using namespace ofen; +struct DownClientInfo { + std::vector files; + std::string id; +}; + +enum TransState { + TRANS_FAILE, + TRANS_ING, + TRANS_REDAY, + TRANS_DONE +}; + +struct TransInfomation { + std::string cur_remote_id_; + std::string cur_remote_file_; + std::string cur_file_; + FILE* file_{}; + TransState trans_state_{TRANS_FAILE}; +}; class CClient { @@ -16,20 +39,27 @@ public: public: bool get_task_list(); - bool down_task(); + bool down_task(const std::string& param); bool up_task(const std::string& cmd); bool cancel_task(); + bool down_one_file(const std::string& id, const std::string& file); + void cancel_trans_file(const std::string& key = ""); private: bool send_frame(CFrameBuffer* buf); private: void handle_frame(CFrameBuffer* buf); + void send_file_data_th(std::string* key); private: std::shared_ptr logger_; asio::io_context io_context_; std::shared_ptr client_; std::vector supported_; - std::map task_list_; + std::map> task_list_; + std::shared_ptr down_; + std::map> up_; + std::mutex mutex_; + std::shared_ptr send_pool_; }; \ No newline at end of file diff --git a/net/net_base.cpp b/net/net_base.cpp index 9c96ab7..3d2b5be 100644 --- a/net/net_base.cpp +++ b/net/net_base.cpp @@ -39,7 +39,7 @@ bool CTcpClient::send(const char* data, int len) { try { auto send_size = asio::write(socket_, asio::buffer(data, len)); - //logger_->info("Need Send len: {} Real Send len: {}", len, send_size); + // logger_->info("Need Send len: {} Real Send len: {}", len, send_size); return static_cast(send_size) == len; } catch (const std::exception& ex) { logger_->error("Send failed: {}", ex.what()); diff --git a/net/net_base.h b/net/net_base.h index a46d045..f9d73ca 100644 --- a/net/net_base.h +++ b/net/net_base.h @@ -27,4 +27,5 @@ private: CMutBuffer buffer_; std::array tmp_buf_; ExFun_t fun_; + std::string remote_key_; }; \ No newline at end of file diff --git a/server/server.cpp b/server/server.cpp index 35ed610..935652f 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -40,6 +40,8 @@ bool CTcpServer::start(unsigned short port) logger_->error("Failed to bind to {}: {}", endpoint.address().to_string(), e.what()); return false; } + auto bound_endpoint = acceptor_.local_endpoint(); + server_ip_ = bound_endpoint.address().to_string() + ":" + std::to_string(bound_endpoint.port()); accept_client(); logger_->info("Server started on port {}", port); return true; @@ -85,6 +87,8 @@ SimpleBuffer* CTcpServer::get_client_list() for (const auto& file : files) { msg.append("\n" + file); } + msg.append("\n"); + ++index; } buf->data_ = new char[msg.size() + 1]; buf->len_ = static_cast(msg.size() + 1); @@ -125,7 +129,7 @@ void CTcpServer::handle_frame() FrameType t = static_cast(buf->type_); switch (t) { case TYPE_GET_LIST: { - logger_->info("GetList."); + logger_->info("[{}] GetList.", buf->fid_); auto* sbuf = get_client_list(); if (sbuf == nullptr) { break; @@ -136,7 +140,7 @@ void CTcpServer::handle_frame() break; } case TYPE_UP_LIST: { - logger_->info("UpList. {}", std::string(buf->data_, buf->len_)); + logger_->info("[{}] UpList. {}", buf->fid_, std::string(buf->data_, buf->len_)); std::lock_guard lock(cli_mut_); if (client_map_.count(buf->fid_)) { auto& cli = client_map_[buf->fid_]; @@ -146,7 +150,7 @@ void CTcpServer::handle_frame() break; } case TYPE_CANCEL_LIST: { - logger_->info("Cancle Task."); + logger_->info("[{}] Cancle Task.", buf->fid_); std::lock_guard lock(cli_mut_); if (client_map_.count(buf->fid_)) { auto& cli = client_map_[buf->fid_]; @@ -154,14 +158,30 @@ void CTcpServer::handle_frame() } break; } - case TYPE_OPEN_FILE: + // 两边发送OPEN + case TYPE_OPEN_FILE: { + std::lock_guard lock(cli_mut_); + if (client_map_.count(buf->tid_)) { + auto& cli = client_map_[buf->tid_]; + if (!send_frame(cli->socket_, buf)) { + logger_->error("[{}] turn tid_ ailed to {}", buf->fid_, buf->tid_); + } + } + if (client_map_.count(buf->fid_)) { + auto& cli = client_map_[buf->fid_]; + if (!send_frame(cli->socket_, buf)) { + logger_->error("[{}] turn fid_ failed to {}", buf->fid_, buf->tid_); + } + } + break; + }; case TYPE_READY_TRANS: case TYPE_TRANS_FILE: { std::lock_guard lock(cli_mut_); if (client_map_.count(buf->tid_)) { auto& cli = client_map_[buf->tid_]; if (!send_frame(cli->socket_, buf)) { - logger_->error("turn failed to {}", buf->tid_); + logger_->error("[{}] turn failed to {}", buf->fid_, buf->tid_); } } break; @@ -279,6 +299,10 @@ bool CTcpServer::send_frame(std::shared_ptr socket, CFram { char* out_buf{}; int out_len{}; + if (buf->fid_.empty()) { + buf->fid_ = server_ip_; + } + if (!CTransProtocal::pack(buf, &out_buf, out_len)) { logger_->error("{} pack failed.", __FUNCTION__); return false; diff --git a/server/server.h b/server/server.h index bd75e1b..410bab0 100644 --- a/server/server.h +++ b/server/server.h @@ -64,4 +64,5 @@ private: std::mutex sbuf_mut_; std::shared_ptr handle_pool_; std::shared_ptr send_pool_; + std::string server_ip_; }; \ No newline at end of file diff --git a/util/util.cpp b/util/util.cpp index 01fc2f3..35cefad 100644 --- a/util/util.cpp +++ b/util/util.cpp @@ -29,6 +29,8 @@ CTransProtocal::~CTransProtocal() header 2 char: 0xFF 0xFE type 2 char: mark 1 char: + from 32 char: + to 32 char: len 4 char: data xxxxx: tail 2 char: 0xFF 0xFF @@ -45,8 +47,8 @@ CFrameBuffer* CTransProtocal::parse(CMutBuffer& buffer) } int16_t type = *(reinterpret_cast(buffer.get_data() + find + 2)); char mark = *(buffer.get_data() + find + 2 + 2); - int32_t len = *(reinterpret_cast(buffer.get_data() + find + 2 + 2 + 1)); - int32_t tail_index = find + 2 + 2 + 1 + 4 + len; + int32_t len = *(reinterpret_cast(buffer.get_data() + find + 2 + 2 + 1 + 32 + 32)); + int32_t tail_index = find + 2 + 2 + 1 + 32 + 32 + 4 + len; if (buffer.get_len() - 2 < tail_index || len < 0) { return result; } @@ -58,10 +60,12 @@ CFrameBuffer* CTransProtocal::parse(CMutBuffer& buffer) result = new CFrameBuffer(); result->data_ = new char[len]; result->len_ = len; + result->fid_ = std::string(buffer.get_data() + find + 2 + 2 + 1); + result->tid_ = std::string(buffer.get_data() + find + 2 + 2 + 1 + 32); result->mark_ = mark; result->type_ = static_cast(type); std::memset(result->data_, 0x0, len); - std::memcpy(result->data_, buffer.get_data() + find + 2 + 2 + 1 + 4, len); + std::memcpy(result->data_, buffer.get_data() + find + 2 + 2 + 1 + 4 + 32 + 32, len); buffer.remove_of(0, tail_index + 2); return result; } @@ -76,14 +80,21 @@ bool CTransProtocal::pack(CFrameBuffer* buf, char** out_buf, int& len) } unsigned char header[] = {0xFF, 0xFE}; unsigned char tail[] = {0xFF, 0xFF}; - len = buf->len_ + 11; - *out_buf = new char[len]; + len = buf->len_ + 75; + *out_buf = new char[len]{}; + std::memset(*out_buf, 0x0, len); std::memcpy(*out_buf, header, 2); std::memcpy(*out_buf + 2, &buf->type_, 2); std::memcpy(*out_buf + 2 + 2, &buf->mark_, 1); - std::memcpy(*out_buf + 2 + 2 + 1, &buf->len_, 4); + if (!buf->fid_.empty()) { + std::memcpy(*out_buf + 2 + 2 + 1, buf->fid_.data(), buf->fid_.size()); + } + if (!buf->tid_.empty()) { + std::memcpy(*out_buf + 2 + 2 + 1 + 32, buf->tid_.data(), buf->tid_.size()); + } + std::memcpy(*out_buf + 2 + 2 + 1 + 32 + 32, &buf->len_, 4); if (buf->data_ != nullptr) { - std::memcpy(*out_buf + 2 + 2 + 1 + 4, buf->data_, buf->len_); + std::memcpy(*out_buf + 2 + 2 + 1 + 32 + 32 + 4, buf->data_, buf->len_); } std::memcpy(*out_buf + len - 2, tail, 2); return true; diff --git a/util/util.h b/util/util.h index 00eefd9..f738d13 100644 --- a/util/util.h +++ b/util/util.h @@ -14,6 +14,7 @@ enum FrameType : int16_t { TYPE_CANCEL_LIST, TYPE_OPEN_FILE, TYPE_TRANS_FILE, + TYPE_TRANS_DONE, TYPE_READY_TRANS, TYPE_INTERRUPT, TYPE_NO_HIT_TASK, @@ -55,6 +56,8 @@ using ExFun_t = std::function; header 2 char: 0xFF 0xFE type 2 char: mark 1 char: + from 32 char: + to 32 char: len 4 char: data xxxxx: tail 2 char: 0xFF 0xFF