#include "client.h" #include #include #include #include #include #ifdef USE_BOOST_FILESYSTEM #include namespace fs = boost::filesystem; #else #include namespace fs = std::filesystem; #endif CClient::CClient() { client_ = std::make_shared(io_context_); supported_.push_back("Get"); sleep_.set_timeout(5000); } CClient::~CClient() { th_run_ = false; sleep_.contiune(); if (down_ && down_->file_.is_open()) { down_->file_.close(); } std::lock_guard lock(mutex_); for (const auto& item : up_) { if (item.second->file_.is_open()) { item.second->file_.close(); } } for (auto& item : ths_) { if (item.joinable()) { item.join(); } } if (update_list_th_.joinable()) { update_list_th_.join(); } if (th_down_active_.joinable()) { th_down_active_.join(); } if (hearts_.joinable()) { hearts_.join(); } } void CClient::run(const std::string& ip, const std::string& port) { th_run_ = true; if (!client_->connect(ip, port)) { mpinfo("{} connect err.", __FUNCTION__); return; } client_->register_func([&](CFrameBuffer* buf) { handle_frame(buf); }); client_->async_recv(); hearts_ = std::thread([&]() { hearts(); }); std::thread thread([&]() { io_context_.run(); }); CFrameBuffer* bf = new CFrameBuffer(); bf->type_ = TYPE_GET_ID; send_frame(bf); delete bf; mpwarn("SupportCmd:Get|Up|Down|Cancel|Update"); fc_append('|'); while (1) { char* readline = fc_readline(); if (readline == nullptr) { break; } if (!th_run_ || !client_->is_normal()) { mpwarn("The link has been closed and cannot be continued. It will automatically exit."); break; } std::string cmd_input(readline); fc_free(readline); std::cout << "" << std::endl; cmd_input = ofen::COfStr::trim(cmd_input); if (cmd_input == "end" || cmd_input == "End") { th_run_ = false; std::this_thread::sleep_for(std::chrono::milliseconds(10)); break; } if (cmd_input == "Get" || cmd_input == "get") { get_task_list(); continue; } if (cmd_input == "Cancel" || cmd_input == "cancel") { cancel_task(); continue; } auto vec = COfStr::split(cmd_input, " "); if (vec.size() < 2) { mperror("No matched cmd, May be param size incorrect."); continue; } std::string param(cmd_input); std::string scmd = param.substr(0, param.find_first_of(" ")); param.erase(0, param.find_first_of(" ") + 1); if (scmd == "Update" || scmd == "update") { request_update_list(param); continue; } if (scmd == "Down" || scmd == "down") { down_task(param); continue; } if (scmd == "Up" || scmd == "up") { up_task(param); continue; } mperror("No matched cmd, May be param size incorrect."); } client_->disconnect(); thread.join(); mpinfo("{} exit.", __FUNCTION__); } bool CClient::get_task_list() { std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_GET_LIST; return send_frame(buf.get()); } bool CClient::down_task(const std::string& param) { if (downloading_) { mpwarn("Have Task Downloading, Please wait....."); return false; } int id = std::stoi(param); if (!task_list_.count(id)) { mperror("No matched id[{}] in task list.", id); return false; } if (task_list_[id]->id == own_id_) { mpwarn("You can't down your own file!!!"); return false; } const auto& vec = task_list_[id]->files; down_ = std::make_shared(); if (vec.empty()) { mpwarn("No files List, Please Check!"); return false; } // 开始传输文件 for (const auto& item : vec) { if (!down_one_file(task_list_[id]->id, item)) { break; } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } return true; } bool CClient::up_task(const std::string& param) { { std::lock_guard lock(mutex_); for (const auto& item : up_) { if (item.second->trans_state_ == TRANS_REDAY || item.second->trans_state_ == TRANS_ING) { mpwarn("Have Task Upping, Please wait!"); return false; } } } auto list = CFileOpr::get_file_list(param); std::string msg; for (const auto& item : list) { if (!fs::exists(item)) { mperror("File {} not exist, please check.", item); return false; } if (!fs::is_regular_file(item)) { mperror("Only Support Up File, But directory.", item); return false; } if (msg.empty()) { msg.append(item); } else { msg.append("|" + item); } } if (msg.empty()) { mpwarn("{} msg empty.", __FUNCTION__); return false; } #ifdef _WIN32 msg = CCodec::ansi_to_u8(msg); #endif std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_UP_LIST; buf->data_ = new char[msg.size() + 1]; std::memset(buf->data_, 0x0, msg.size() + 1); buf->len_ = std::snprintf(buf->data_, msg.size() + 1, "%s", msg.data()); return send_frame(buf.get()); } bool CClient::cancel_task() { { std::lock_guard lock(mutex_); for (const auto& item : up_) { if (item.second->trans_state_ == TRANS_REDAY || item.second->trans_state_ == TRANS_ING) { mpwarn("Have Task Upping, Please wait!"); return false; } } } std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_CANCEL_LIST; return send_frame(buf.get()); } bool CClient::down_one_file(const std::string& id, const std::string& file, const std::string& local_dir) { std::string back_file(file); std::string back_local_dir(local_dir); #ifdef _WIN32 back_file = CCodec::u8_to_ansi(back_file); back_local_dir = CCodec::u8_to_ansi(back_local_dir); #endif down_->cur_remote_id_ = id; down_->cur_remote_file_ = back_file; fs::path remote_file(ofen::COfPath::normalize(down_->cur_remote_file_)); if (back_local_dir.empty()) { down_->cur_file_ = COfPath::to_full(remote_file.filename().string()); } else { down_->cur_file_ = fs::path(back_local_dir).append(remote_file.filename().string()).string(); } mpwarn("Start Down => {} To {}", down_->cur_remote_file_, down_->cur_file_); down_->file_.open(down_->cur_file_, std::ios::out | std::ios::binary); if (!down_->file_.is_open()) { mperror("Open {} Failed.", down_->cur_file_); return false; } // 请求下载文件 std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_OPEN_FILE; buf->tid_ = id; buf->data_ = new char[file.size() + 1]; buf->len_ = std::snprintf(buf->data_, file.size() + 1, "%s", file.data()); if (!send_frame(buf.get())) { mperror("{} request open file [{}] send failed.", __FUNCTION__, down_->cur_remote_file_); down_->cur_remote_id_.clear(); down_->cur_remote_file_.clear(); return false; } will_receive_ = true; down_->trans_state_ = TRANS_REDAY; cur_down_size_ = 0; float percent = 0.0; while (down_->trans_state_ != TRANS_DONE && down_->trans_state_ != TRANS_FAILED) { std::this_thread::sleep_for(std::chrono::milliseconds(down_check_wait)); if (cur_file_size_ > 0) { percent = (float)cur_down_size_ / cur_file_size_; CTransProtocal::display_progress(percent); } if (!th_run_) { mperror("Interrup When Receive File."); report_trans_ret(TRANS_FAILED); return false; } } if (cur_file_size_ > 0) { percent = (float)cur_down_size_ / cur_file_size_; CTransProtocal::display_progress(percent); } if (cur_down_size_ > 0 && cur_file_size_ == cur_down_size_) { mpwarn("down one file success, total:[{}/{}]", cur_down_size_, cur_file_size_); return true; } else { mpwarn("down one file {} failed.", down_->cur_file_); if (!down_->file_.is_open()) { down_->file_.close(); fs::remove(down_->cur_file_); } return false; } } void CClient::report_trans_ret(TransState state, const std::string& key) { std::shared_ptr t = nullptr; if (key.empty()) { t = down_; downloading_ = false; will_receive_ = false; if (th_down_active_.joinable()) { th_down_active_.join(); } } else { std::lock_guard lock(mutex_); if (up_.count(key)) { t = up_[key]; } } if (t == nullptr) { return; } t->trans_state_ = state; if (t->file_.is_open()) { t->file_.close(); if (key.empty() && t->trans_state_ == TRANS_FAILED) { fs::remove(t->cur_file_); } } t->cur_remote_file_.clear(); t->cur_remote_id_.clear(); } /* 清单文件,内容格式为: D:/a.txt|/home/zhangsan/ C:/Dijava|/home/zhangsan/dia 功能为,请求某个客户端,更新我所列出的文件,右侧是远端需要存储的目录(必须存在,不存在则不理会) */ bool CClient::request_update_list(const std::string& param) { auto tvec = COfStr::split(param, " "); if (tvec.size() < 2) { mperror("{} invalid param format [{}]", __FUNCTION__, param); return false; } int index = std::stoi(tvec[0]); std::string list_file = tvec[1]; if (downloading_) { mpwarn("Have Task Downloading, Please wait....."); return false; } if (!task_list_.count(index)) { mperror("No Index Found {}.", index); return false; } const auto& sr = task_list_[index]; if (sr->id == own_id_) { mpwarn("You can't update your own file!!!"); return false; } // 读取list文件 std::ifstream in(COfPath::to_full(list_file)); if (!in.is_open()) { mperror("Can't Open File:{}", COfPath::to_full(list_file)); return false; } std::istreambuf_iterator iterf(in); std::istreambuf_iterator iter; std::string content(iterf, iter); in.close(); #if defined(_WIN32) content = CCodec::u8_to_ansi(content); #endif // 校验格式是否正确 auto vec = COfStr::split(content, "\n"); bool valid = true; std::string handled_content; for (const auto& item : vec) { std::string hitem = COfStr::trim(item); if (hitem.empty()) { continue; } mpinfo("---> check {}", hitem); auto v = COfStr::split(hitem, "|"); if (v.size() >= 2) { if (!fs::exists(v[0])) { mperror("file {} not exist.", v[0]); valid = false; break; } handled_content.append(hitem + "\n"); continue; } valid = false; break; } if (!valid) { mperror("Judge List File {} Format Not Passed.", list_file); return false; } #if defined(_WIN32) handled_content = CCodec::ansi_to_u8(handled_content); #endif list_file_ = list_file; std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_REQUEST_UPDATE_LIST; buf->data_ = new char[handled_content.size() + 1](); buf->len_ = std::snprintf(buf->data_, handled_content.size() + 1, "%s", handled_content.c_str()); buf->tid_ = task_list_[index]->id; if (!send_frame(buf.get())) { mperror("Send Failed {}", __LINE__); return false; } return true; } bool CClient::check_update_list(const std::string& content, std::map& files) { std::string back_str(content); #ifdef _WIN32 back_str = CCodec::u8_to_ansi(back_str); #endif auto vec = COfStr::split(back_str, "\n"); bool valid = true; for (const auto& item : vec) { if (item.empty()) { continue; } auto vi = COfStr::split(item, "|"); if (vi.size() != 2) { mperror("Size not 2 {}", item); valid = false; break; } if (!fs::exists(vi[1])) { valid = false; mperror("Not exist {}", vi[1]); break; } #ifdef _WIN32 files[CCodec::ansi_to_u8(vi[0])] = CCodec::ansi_to_u8(vi[1]); #else files[vi[0]] = vi[1]; #endif } return valid; } bool CClient::down_update_file(std::map files) { std::shared_ptr buf = std::make_shared(); buf->tid_ = list_serve_id_; down_ = std::make_shared(); bool suc = true; for (const auto& item : files) { if (!down_one_file(list_serve_id_, item.first, item.second)) { suc = false; break; } } if (suc) { buf->type_ = TYPE_DONE_UPDATE_LIST; mpinfo("Do Task From Remote {} Done!", buf->tid_); } else { buf->type_ = TYPE_FAILED_UPDATE_LIST; mpinfo("Do Task From Remote {} Failed!", buf->tid_); } send_frame(buf.get()); return suc; } bool CClient::send_frame(CFrameBuffer* buf) { char* out_buf{}; int out_len{}; if (!CTransProtocal::pack(buf, &out_buf, out_len)) { mperror("{} pack failed.", __FUNCTION__); return false; } std::lock_guard lock(send_mut_); if (!client_->send(out_buf, out_len)) { delete[] out_buf; return false; } delete[] out_buf; return true; } void CClient::handle_frame(CFrameBuffer* buf) { if (buf == nullptr) { mperror("{} nullptr.", __FUNCTION__); return; } switch (buf->type_) { case TYPE_GET_ID: { mpdebug("Your ID:{}", buf->tid_); own_id_ = buf->tid_; break; } case TYPE_GET_LIST: { task_list_.clear(); std::string source(buf->data_, buf->len_); auto vec = COfStr::split(source, "\n"); int index = -1; for (const auto& item : vec) { std::string real = COfStr::trim(item); if (real.empty()) { continue; } if (real.find("[") == std::string::npos) { #ifdef _WIN32 mpinfo("FILE ==> {}", CCodec::u8_to_ansi(real)); #else mpinfo("FILE ==> {}", real); #endif task_list_[index]->files.push_back(real); } else { auto a = real.find_first_of("[") + 1; auto b = real.find_first_of("]"); std::string str_index = real.substr(a, b - a); index = std::stoi(str_index); std::string backup = real; backup.erase(0, b + 1); auto aa = backup.find_first_of("[") + 1; auto bb = backup.find_first_of("]"); std::string id = backup.substr(aa, bb - aa); if (!task_list_.count(index)) { task_list_[index] = std::make_shared(); task_list_[index]->id = id; } mpdebug("*****************************************"); mpinfo("{}", real); } } break; } // 能接收到 TRANS 一定是客户端(这里不是指Server) case TYPE_TRANS_FILE: { if (!downloading_) { downloading_ = true; th_down_active_ = std::thread([&]() { judget_down_active(); }); } if (will_receive_) { down_->file_.write(buf->data_, buf->len_); if (down_->file_.fail()) { report_trans_ret(TRANS_FAILED); mpwarn("no matched write and data. {}", buf->len_); } cur_down_size_ += buf->len_; } break; } case TYPE_OPEN_FILE: { std::string keys{}; { std::lock_guard lock(mutex_); up_[buf->fid_] = std::make_shared(); #ifdef _WIN32 up_[buf->fid_]->cur_file_ = CCodec::u8_to_ansi(std::string(buf->data_, buf->len_)); #else up_[buf->fid_]->cur_file_ = std::string(buf->data_, buf->len_); #endif up_[buf->fid_]->file_.open(up_[buf->fid_]->cur_file_, std::ios::in | std::ios::binary); up_[buf->fid_]->trans_state_ = TRANS_REDAY; if (!up_[buf->fid_]->file_.is_open()) { mperror("Ready Send File {} Open Failed.", up_[buf->fid_]->cur_file_); buf->type_ = TYPE_OPEN_FAILED; std::swap(buf->tid_, buf->fid_); if (!send_frame(buf)) { mperror("Send Failed {}.", __LINE__); break; } break; } keys = buf->fid_; } if (!keys.empty()) { ths_.emplace_back(std::thread([this, keys]() { send_file_data_th(keys.c_str()); })); } break; } case TYPE_TRANS_DONE: { report_trans_ret(TRANS_DONE); break; } case TYPE_OPEN_FAILED: { mperror("Remote {} Open File Failed.", buf->fid_); if (down_) { down_->trans_state_ = TRANS_FAILED; } break; } case TYPE_OFFLINE: { if (buf->mark_) { std::lock_guard lock(mutex_); if (!up_.count(buf->fid_)) { mpwarn("Offline no match."); break; } auto t = up_[buf->fid_]; t->trans_state_ = TRANS_BREAK; break; } if (downloading_ && !down_->cur_remote_file_.empty()) { mpwarn("Stop Down {} From {}.", down_->cur_remote_file_, buf->fid_); } report_trans_ret(TRANS_FAILED); break; } case TYPE_REQUEST_UPDATE_LIST: { std::map files; if (down_ && down_->trans_state_ == TRANS_REDAY) { mpwarn("Update Busy......, Ignore {}", buf->fid_); buf->type_ = TYPE_BUSY_UPDATE_LIST; } else { std::string content(buf->data_, buf->len_); if (check_update_list(content, files)) { buf->type_ = TYPE_CONFIRM_UPDATE_LIST; } else { buf->type_ = TYPE_UNCONFIRM_UPDATE_LIST; } } std::swap(buf->tid_, buf->fid_); if (!send_frame(buf)) { mperror("Send Failed {}.", __LINE__); break; } if (buf->type_ != TYPE_CONFIRM_UPDATE_LIST) { break; } list_serve_id_ = buf->tid_; mpdebug("Do Task From Remote {}.", buf->tid_); if (update_list_th_.joinable()) { update_list_th_.join(); } update_list_th_ = std::thread([this, files]() { down_update_file(files); }); break; } case TYPE_CONFIRM_UPDATE_LIST: { mpinfo("remote {} check {} passed!", buf->fid_, list_file_); break; } case TYPE_UNCONFIRM_UPDATE_LIST: { mperror("remote {} check {} not passed!", buf->fid_, list_file_); break; } case TYPE_DONE_UPDATE_LIST: { mpinfo("remote {} do task {} success!", buf->fid_, list_file_); break; } case TYPE_FAILED_UPDATE_LIST: { mpinfo("remote {} do task {} failed!", buf->fid_, list_file_); break; } case TYPE_BUSY_UPDATE_LIST: { mpinfo("remote {} are busy, will not exec task {}", buf->fid_, list_file_); break; } case TYPE_FILE_SIZE: { std::string str_size(buf->data_, buf->len_); long long size = std::stoll(str_size); std::string show_str = OfUtil::get_file_size(size); mpinfo("Ready Down Size: {}", show_str); cur_file_size_ = size; } default: break; } } void CClient::send_file_data_th(const char* keys) { std::string str_key(keys); std::shared_ptr t = nullptr; { std::lock_guard lock(mutex_); if (!up_.count(str_key)) { mperror("{} no matched key.", __FUNCTION__); return; } t = up_[str_key]; } mpinfo("Start Trans File {} To {}", t->cur_file_, str_key); std::shared_ptr buf = std::make_shared(); buf->data_ = new char[g_BuffSize]{}; buf->tid_ = str_key; // seekg 用于读,seekp 用于写。 t->file_.seekg(0, std::ios::end); long long size = t->file_.tellg(); t->file_.seekg(0, std::ios::beg); buf->type_ = TYPE_FILE_SIZE; std::string str_size = std::to_string(size); mpinfo("To {} File Size: {} [{}]", str_key, ofen::OfUtil::get_file_size(size), size); buf->len_ = std::snprintf(buf->data_, g_BuffSize, "%s", str_size.c_str()); if (!send_frame(buf.get())) { report_trans_ret(TRANS_FAILED, str_key); mperror("Stop Trans {} To {} failed.", t->cur_file_, str_key); return; } buf->type_ = TYPE_TRANS_FILE; buf->mark_ = 1; while (!t->file_.eof()) { if (t->trans_state_ == TRANS_BREAK) { mpwarn("Stop Trans {} To {} failed.", t->cur_file_, str_key); report_trans_ret(TRANS_FAILED, str_key); return; } t->file_.read(buf->data_, g_BuffSize); buf->len_ = t->file_.gcount(); if (!send_frame(buf.get())) { report_trans_ret(TRANS_FAILED, str_key); mperror("Stop Trans {} To {} failed.", t->cur_file_, str_key); return; } // std::this_thread::sleep_for(std::chrono::milliseconds(10)); } buf->type_ = TYPE_TRANS_DONE; if (!send_frame(buf.get())) { mperror("send_file_data_th send DONE failed."); } report_trans_ret(TRANS_DONE, str_key); mpdebug("Trans File {} To {} Done !!!", t->cur_file_, str_key); } void CClient::hearts() { std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_HEARTS; while (th_run_) { sleep_.sleep(); if (th_run_ && !send_frame(buf.get())) { mperror("{} send failed.", __FUNCTION__); th_run_ = false; } } } void CClient::judget_down_active() { std::shared_ptr buf = std::make_shared(); buf->type_ = TYPE_JUDGE_ACTIVE; buf->tid_ = down_->cur_remote_id_; while (downloading_ && th_run_) { std::this_thread::sleep_for(std::chrono::milliseconds(2000)); send_frame(buf.get()); } } CFileOpr::CFileOpr() { } CFileOpr::~CFileOpr() { } std::vector CFileOpr::get_file_list(const std::string& input) { std::vector result; auto backup = COfStr::trim(input); if (backup.empty()) { return result; } auto vec = COfStr::split(backup, "|"); for (const auto& item : vec) { std::string ret = COfStr::trim(item); std::string trim_item = ret; #ifdef _WIN32 if (item.find("\"") != std::string::npos) { ret = COfStr::replace(trim_item, "\"", ""); } #else if (item.find(R"(')") != std::string::npos) { ret = COfStr::replace(trim_item, R"(')", ""); } #endif result.push_back(COfPath::to_full(ret)); } return result; }