transm/client/client.cpp

405 lines
12 KiB
C++
Raw Normal View History

2024-12-11 23:23:48 +08:00
#include "client.h"
#include <filesystem>
2024-12-11 23:23:48 +08:00
#include <iostream>
#include <of_path.h>
#include <of_str.h>
#include <of_util.h>
2024-12-11 23:23:48 +08:00
namespace fs = std::filesystem;
constexpr int g_SendPoolNum = 1;
2024-12-11 23:23:48 +08:00
CClient::CClient(const std::shared_ptr<spdlog::logger>& logger) : logger_(logger)
{
client_ = std::make_shared<CTcpClient>(io_context_, logger_);
send_pool_ = std::make_shared<CThreadPool>(g_SendPoolNum);
send_pool_->init();
2024-12-13 16:59:31 +08:00
supported_.push_back("Get");
sleep_.set_timeout(2000);
2024-12-11 23:23:48 +08:00
}
CClient::~CClient()
{
th_run_ = false;
sleep_.contiune();
if (down_->file_) {
fclose(down_->file_);
down_->file_ = nullptr;
}
std::lock_guard<std::mutex> lock(mutex_);
for (const auto& item : up_) {
if (item.second->file_) {
fclose(item.second->file_);
item.second->file_ = nullptr;
}
}
if (hearts_.joinable()) {
hearts_.join();
}
2024-12-11 23:23:48 +08:00
}
void CClient::run(const std::string& ip, const std::string& port)
2024-12-11 23:23:48 +08:00
{
th_run_ = true;
if (!client_->connect(ip, port)) {
logger_->info("{} connect err.", __FUNCTION__);
return;
}
client_->register_func([&](CFrameBuffer* buf) { handle_frame(buf); });
client_->async_recv();
hearts_ = std::thread([&]() { hearts(); });
std::thread thread([&]() { io_context_.run(); });
logger_->warn("SupportCmd:Get|Up|Down|Cancel");
2024-12-11 23:23:48 +08:00
char line[512]{};
while (std::cin.getline(line, 512)) {
if (!th_run_) {
break;
}
std::string cmd_input(line);
if (cmd_input == "end") {
2024-12-11 23:23:48 +08:00
break;
}
auto vec = COfStr::split(cmd_input, " ");
2024-12-13 16:59:31 +08:00
if (vec.size() < 1) {
logger_->error("input's invalid format.");
continue;
}
2024-12-13 16:59:31 +08:00
std::string cmd{};
std::string param{};
if (vec.size() == 1) {
cmd = vec[0];
} else {
2024-12-13 16:59:31 +08:00
cmd = vec[0];
param = vec[1];
}
if (cmd == "Get") {
get_task_list();
continue;
}
if (cmd == "Down") {
down_task(vec[1]);
2024-12-13 16:59:31 +08:00
continue;
}
if (cmd == "Up") {
up_task(cmd_input);
continue;
}
if (cmd == "Cancel") {
cancel_task();
2024-12-13 16:59:31 +08:00
continue;
}
logger_->error("No matched cmd.");
2024-12-11 23:23:48 +08:00
}
client_->disconnect();
thread.join();
logger_->info("{} exit.", __FUNCTION__);
}
bool CClient::get_task_list()
{
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->type_ = TYPE_GET_LIST;
return send_frame(buf.get());
}
bool CClient::down_task(const std::string& param)
{
int id = std::stoi(param);
if (!task_list_.count(id)) {
logger_->error("No matched id[{}] in task list.", id);
return false;
}
down_ = std::make_shared<TransInfomation>();
const auto& vec = task_list_[id]->files;
// 开始传输文件
for (const auto& item : vec) {
if (!down_one_file(task_list_[id]->id, item)) {
break;
}
}
return true;
}
bool CClient::up_task(const std::string& cmd)
2024-12-13 16:59:31 +08:00
{
auto list = CFileOpr::get_file_list(cmd);
std::string msg;
for (const auto& item : list) {
if (!fs::exists(item)) {
logger_->error("File {} not exist, please check!", item);
return false;
}
if (msg.empty()) {
msg.append(item);
} else {
msg.append("|" + item);
}
}
if (msg.empty()) {
logger_->warn("{} msg empty.", __FUNCTION__);
return false;
}
#ifdef _WIN32
msg = CCodec::GBKTou8(msg);
#endif
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->type_ = TYPE_UP_LIST;
buf->data_ = new char[msg.size() + 1];
std::memset(buf->data_, 0x0, msg.size() + 1);
buf->len_ = std::snprintf(buf->data_, msg.size() + 1, "%s", msg.data());
return send_frame(buf.get());
2024-12-13 16:59:31 +08:00
}
bool CClient::cancel_task()
2024-12-13 16:59:31 +08:00
{
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->type_ = TYPE_CANCEL_LIST;
return send_frame(buf.get());
}
bool CClient::down_one_file(const std::string& id, const std::string& file)
{
down_->cur_remote_id_ = id;
2024-12-17 09:40:18 +08:00
#ifdef _WIN32
2024-12-17 09:34:37 +08:00
down_->cur_remote_file_ = CCodec::u8ToGBK(file);
2024-12-17 09:40:18 +08:00
#else
down_->cur_remote_file_ = file;
#endif
fs::path remote_file(ofen::COfPath::normalize(down_->cur_remote_file_));
down_->cur_file_ = COfPath::to_full(remote_file.filename().string());
2024-12-17 09:40:18 +08:00
logger_->warn("Start Down => {} To {}", down_->cur_remote_file_, down_->cur_file_);
// 请求下载文件
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->type_ = TYPE_OPEN_FILE;
buf->tid_ = id;
buf->data_ = new char[file.size() + 1];
buf->len_ = std::snprintf(buf->data_, file.size() + 1, "%s", file.data());
if (!send_frame(buf.get())) {
logger_->error("{} request open file [{}] send failed.", __FUNCTION__, file);
down_->cur_remote_id_.clear();
down_->cur_remote_file_.clear();
return false;
}
down_->trans_state_ = TRANS_REDAY;
2024-12-16 14:04:52 +08:00
while (down_->trans_state_ != TRANS_DONE) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (!th_run_) {
logger_->error("Interrup When Receive File.");
report_trans_ret(TRANS_FAILED);
return false;
}
2024-12-16 14:04:52 +08:00
}
return true;
}
void CClient::report_trans_ret(TransState state, const std::string& key)
{
std::shared_ptr<TransInfomation> t = nullptr;
if (key.empty()) {
t = down_;
} else {
std::lock_guard<std::mutex> lock(mutex_);
if (up_.count(key)) {
t = up_[key];
}
}
if (t == nullptr) {
return;
}
t->trans_state_ = state;
if (t->file_) {
fclose(t->file_);
t->file_ = nullptr;
if (key.empty() && t->trans_state_ == TRANS_FAILED) {
fs::remove(t->cur_file_);
}
}
t->cur_remote_file_.clear();
t->cur_remote_id_.clear();
}
bool CClient::send_frame(CFrameBuffer* buf)
{
char* out_buf{};
int out_len{};
if (!CTransProtocal::pack(buf, &out_buf, out_len)) {
logger_->error("{} pack failed.", __FUNCTION__);
return false;
}
std::lock_guard<std::mutex> lock(send_mut_);
if (!client_->send(out_buf, out_len)) {
logger_->error("{} send failed.", __FUNCTION__);
delete[] out_buf;
return false;
}
delete[] out_buf;
return true;
2024-12-13 16:59:31 +08:00
}
void CClient::handle_frame(CFrameBuffer* buf)
{
if (buf == nullptr) {
logger_->error("{} nullptr.", __FUNCTION__);
return;
}
2024-12-15 13:14:04 +08:00
switch (buf->type_) {
case TYPE_GET_LIST: {
2024-12-13 16:59:31 +08:00
task_list_.clear();
2024-12-14 19:49:44 +08:00
std::string source(buf->data_, buf->len_);
auto vec = COfStr::split(source, "\n");
int index = -1;
2024-12-13 16:59:31 +08:00
for (const auto& item : vec) {
2024-12-14 19:49:44 +08:00
std::string real = COfStr::trim(item);
if (real.empty()) {
continue;
}
2024-12-14 19:49:44 +08:00
if (real.find("[") == std::string::npos) {
2024-12-17 09:40:18 +08:00
#ifdef _WIN32
2024-12-17 09:34:37 +08:00
logger_->info("FILE ==> {}", CCodec::u8ToGBK(real));
2024-12-17 09:40:18 +08:00
#else
logger_->info("FILE ==> {}", real);
#endif
2024-12-14 19:49:44 +08:00
task_list_[index]->files.push_back(real);
} else {
2024-12-14 19:49:44 +08:00
auto a = real.find_first_of("[") + 1;
auto b = real.find_first_of("]");
std::string str_index = real.substr(a, b - a);
index = std::stoi(str_index);
2024-12-14 19:49:44 +08:00
std::string backup = real;
backup.erase(0, b + 1);
auto aa = backup.find_first_of("[") + 1;
auto bb = backup.find_first_of("]");
std::string id = backup.substr(aa, bb - aa);
if (!task_list_.count(index)) {
task_list_[index] = std::make_shared<DownClientInfo>();
task_list_[index]->id = id;
}
logger_->debug("***********************************************");
2024-12-14 19:49:44 +08:00
logger_->info("{}", real);
}
2024-12-13 16:59:31 +08:00
}
break;
}
// (客户服务都不是指Server)如果是客户端啥也不干,服务端则开始发送数据
case TYPE_READY_TRANS: {
if (buf->mark_ != 0) {
break;
}
work_key_ = buf->fid_;
send_pool_->submit([&]() { send_file_data_th(); });
break;
}
// 能接收到 TRANS 一定是客户端(这里不是指Server)
case TYPE_TRANS_FILE: {
auto ws = fwrite(buf->data_, 1, buf->len_, down_->file_);
if (static_cast<int>(ws) != buf->len_) {
logger_->warn("no matched write and data.");
}
break;
}
case TYPE_OPEN_FILE: {
std::shared_ptr<TransInfomation> t = nullptr;
std::string id = buf->fid_;
if (buf->mark_ == 0) {
std::lock_guard<std::mutex> lock(mutex_);
up_[buf->fid_] = std::make_shared<TransInfomation>();
t = up_[buf->fid_];
2024-12-17 09:34:37 +08:00
#ifdef _WIN32
t->cur_file_ = CCodec::u8ToGBK(std::string(buf->data_, buf->len_));
#else
t->cur_file_ = std::string(buf->data_, buf->len_);
2024-12-17 09:34:37 +08:00
#endif
t->file_ = fopen(t->cur_file_.c_str(), "rb");
} else {
t = down_;
t->file_ = fopen(t->cur_file_.c_str(), "wb");
}
if (t->file_ == nullptr) {
logger_->error("open file {} failed.", t->cur_file_);
report_trans_ret(TRANS_FAILED, buf->mark_ == 0 ? id : "");
break;
}
std::shared_ptr<CFrameBuffer> tmp = std::make_shared<CFrameBuffer>();
tmp->type_ = TYPE_READY_TRANS;
tmp->mark_ = buf->mark_ == 0 ? 1 : 0;
tmp->tid_ = buf->fid_;
if (!send_frame(tmp.get())) {
logger_->error("TYPE_OPEN_FILE send ready failed.");
report_trans_ret(TRANS_FAILED, buf->mark_ == 0 ? id : "");
}
break;
}
case TYPE_TRANS_DONE: {
logger_->warn("Trans done, close file {}.", down_->cur_file_);
if (down_->file_) {
fclose(down_->file_);
down_->file_ = nullptr;
}
down_->trans_state_ = TRANS_DONE;
break;
}
default:
break;
}
}
void CClient::send_file_data_th()
{
std::string str_key = work_key_;
std::shared_ptr<TransInfomation> t = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
if (!up_.count(str_key)) {
logger_->error("{} no matched key.", __FUNCTION__);
return;
}
t = up_[str_key];
}
logger_->info("Start Trans File {} To {}", t->cur_file_, str_key);
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->type_ = TYPE_TRANS_FILE;
buf->tid_ = str_key;
2024-12-14 23:59:13 +08:00
buf->data_ = new char[g_BuffSize]{};
buf->mark_ = 1;
while (!feof(t->file_)) {
2024-12-14 23:59:13 +08:00
buf->len_ = fread(buf->data_, 1, g_BuffSize, t->file_);
if (!send_frame(buf.get())) {
report_trans_ret(TRANS_FAILED, str_key);
logger_->error("send_file_data_th send failed.");
return;
}
}
buf->type_ = TYPE_TRANS_DONE;
if (!send_frame(buf.get())) {
logger_->error("send_file_data_th send DONE failed.");
}
report_trans_ret(TRANS_DONE, str_key);
logger_->debug("Trans File {} To {} Done !!!", t->cur_file_, str_key);
}
void CClient::hearts()
{
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->type_ = TYPE_HEARTS;
while (th_run_) {
sleep_.sleep();
if (!send_frame(buf.get())) {
logger_->error("{} send failed.", __FUNCTION__);
th_run_ = false;
}
}
}