change:微调整一些细节,类型改为枚举。

This commit is contained in:
taynpg 2024-12-14 11:57:33 +08:00
parent 19fa6e3d48
commit 97c3e80230
8 changed files with 120 additions and 38 deletions

View File

@ -32,10 +32,22 @@
`mark``0`时表示数据的最后一包,其他数据表示非最后一包。
`type`: 199,特殊标记,表示询问在线客户端及挂载任务。
`type`: 199,表示询问在线客户端及挂载任务。
`type`: 198,特殊标记,下载任务
`type`: 198,下载清单文件
`type`: 197,特殊标记,上载任务
`type`: 197,上载清单
`type`: 196,特殊标记,取消上载任务。
`type`: 196,取消上载任务。
`type`: 195,请求打开文件,返回mark值为1表示OK,为0表示失败。
`type`: 194,可以传输文件。
`type`: 193,文件数据。
`type`: 192,传输结束。
`type`:191,异常中断传输。
`type`:190,没有此清单。

View File

@ -72,11 +72,14 @@ void CClient::run()
bool CClient::get_task_list()
{
return send_frame(199, gGet, std::strlen(gGet));
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->type_ = TYPE_GET_LIST;
return send_frame(buf.get());
}
bool CClient::down_task()
{
// if (send_frame(198, ))
return false;
}
@ -91,25 +94,30 @@ bool CClient::up_task(const std::string& cmd)
msg.append("|" + item);
}
}
return send_frame(197, msg.data(), msg.size());
if (msg.empty()) {
logger_->warn("{} msg empty.", __FUNCTION__);
return false;
}
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->type_ = TYPE_UP_LIST;
buf->data_ = new char[msg.size()];
std::memset(buf->data_, 0x0, msg.size());
buf->len_ = std::snprintf(buf->data_, msg.size(), "%s", msg.data());
return send_frame(buf.get());
}
bool CClient::cancel_task()
{
char buffer[] = "None";
return send_frame(196, buffer, sizeof(buffer));
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->type_ = TYPE_CANCEL_LIST;
return send_frame(buf.get());
}
bool CClient::send_frame(int type, const char* data, int len)
bool CClient::send_frame(CFrameBuffer* buf)
{
std::shared_ptr<CFrameBuffer> buf = std::make_shared<CFrameBuffer>();
buf->type_ = type;
buf->data_ = new char[len + 1];
std::memset(buf->data_, 0x0, len + 1);
buf->len_ = std::snprintf(buf->data_, len + 1, "%s", data);
char* out_buf{};
int out_len{};
if (!CTransProtocal::pack(buf.get(), &out_buf, out_len)) {
if (!CTransProtocal::pack(buf, &out_buf, out_len)) {
logger_->error("{} pack failed.", __FUNCTION__);
return false;
}
@ -138,8 +146,7 @@ void CClient::handle_frame(CFrameBuffer* buf)
for (const auto& item : vec) {
if (item.find("[") == std::string::npos) {
logger_->info("FILE ==> {}", item);
}
else {
} else {
logger_->debug("***********************************************");
logger_->debug("{}", item);
}

View File

@ -21,7 +21,7 @@ public:
bool cancel_task();
private:
bool send_frame(int type, const char* data, int len);
bool send_frame(CFrameBuffer* buf);
private:
void handle_frame(CFrameBuffer* buf);

View File

@ -74,7 +74,7 @@ std::vector<TaskList> CTcpServer::get_clients()
SimpleBuffer* CTcpServer::get_client_list()
{
CFrameBuffer* buf = new CFrameBuffer();
buf->type_ = 199;
buf->type_ = TYPE_GET_LIST;
auto vec = get_clients();
std::string msg;
@ -122,31 +122,52 @@ void CTcpServer::handle_frame()
continue;
}
// 拿到该包后,要看转发给谁或者处理
if (buf->type_ == 199) { // 询问在线客户端
FrameType t = static_cast<FrameType>(buf->type_);
switch (t) {
case TYPE_GET_LIST: {
logger_->info("GetList.");
auto* sbuf = get_client_list();
if (sbuf == nullptr) {
continue;
break;
}
sbuf->id_ = buf->id_;
sbuf->id_ = buf->fid_;
std::lock_guard<std::mutex> lock(sbuf_mut_);
scache_.push(sbuf);
} else if (buf->type_ == 197) {
break;
}
case TYPE_UP_LIST: {
logger_->info("UpList. {}", std::string(buf->data_, buf->len_));
std::lock_guard<std::mutex> lock(cli_mut_);
if (client_map_.count(buf->id_)) {
auto& cli = client_map_[buf->id_];
if (client_map_.count(buf->fid_)) {
auto& cli = client_map_[buf->fid_];
cli->task_ = std::string(buf->data_, buf->len_);
cli->time_ = OfUtil::now_time();
}
} else if (buf->type_ == 196) {
break;
}
case TYPE_CANCEL_LIST: {
logger_->info("Cancle Task.");
std::lock_guard<std::mutex> lock(cli_mut_);
if (client_map_.count(buf->id_)) {
auto& cli = client_map_[buf->id_];
if (client_map_.count(buf->fid_)) {
auto& cli = client_map_[buf->fid_];
cli->task_.clear();
}
break;
}
case TYPE_OPEN_FILE:
case TYPE_READY_TRANS:
case TYPE_TRANS_FILE: {
std::lock_guard<std::mutex> lock(cli_mut_);
if (client_map_.count(buf->tid_)) {
auto& cli = client_map_[buf->tid_];
if (!send_frame(cli->socket_, buf)) {
logger_->error("turn failed to {}", buf->tid_);
}
}
break;
}
default:
break;
}
delete buf;
buf = nullptr;
@ -245,11 +266,28 @@ void CTcpServer::th_client(std::shared_ptr<asio::ip::tcp::socket> socket, const
cache->buffer_.push(cache->tmp_buf_.data(), length);
auto* frame = CTransProtocal::parse(cache->buffer_);
if (frame) {
frame->id_ = client_key;
frame->fid_ = client_key;
push_frame(frame);
}
}
} catch (std::exception& e) {
logger_->error("Error with client {}: {}", client_key, e.what());
}
}
}
bool CTcpServer::send_frame(std::shared_ptr<asio::ip::tcp::socket> socket, CFrameBuffer* buf)
{
char* out_buf{};
int out_len{};
if (!CTransProtocal::pack(buf, &out_buf, out_len)) {
logger_->error("{} pack failed.", __FUNCTION__);
return false;
}
if (!socket->send(asio::buffer(out_buf, out_len))) {
logger_->error("{} send failed.", __FUNCTION__);
delete[] out_buf;
return false;
}
delete[] out_buf;
return true;
}

View File

@ -13,6 +13,7 @@ struct ClientCache {
std::array<char, 1024> tmp_buf_;
std::string task_;
std::string time_;
FrameType cur_type_{TYPE_DEFAULT};
};
struct TaskList {
std::string id_;
@ -43,6 +44,12 @@ private:
void accept_client();
void th_client(std::shared_ptr<asio::ip::tcp::socket> socket, const std::string& client_key);
/// @brief 不删除 buf
/// @param socket
/// @param buf
/// @return
bool send_frame(std::shared_ptr<asio::ip::tcp::socket> socket, CFrameBuffer* buf);
private:
bool th_run_{false};
asio::io_context& io_context_;

View File

@ -6,7 +6,7 @@ std::shared_ptr<spdlog::logger> g_Logger;
void TestHandle(CFrameBuffer* buf)
{
std::string chinese_test("中文测试");
g_Logger->debug("type: {}", buf->type_);
g_Logger->debug("type: {}", static_cast<int>(buf->type_));
g_Logger->info("len: {}", buf->len_);
g_Logger->warn("{} exec. {} 1", __FUNCTION__, chinese_test);
g_Logger->error("{} exec. {} 2", __FUNCTION__, chinese_test);

View File

@ -59,7 +59,7 @@ CFrameBuffer* CTransProtocal::parse(CMutBuffer& buffer)
result->data_ = new char[len];
result->len_ = len;
result->mark_ = mark;
result->type_ = type;
result->type_ = static_cast<FrameType>(type);
std::memset(result->data_, 0x0, len);
std::memcpy(result->data_, buffer.get_data() + find + 2 + 2 + 1 + 4, len);
buffer.remove_of(0, tail_index + 2);
@ -68,9 +68,12 @@ CFrameBuffer* CTransProtocal::parse(CMutBuffer& buffer)
bool CTransProtocal::pack(CFrameBuffer* buf, char** out_buf, int& len)
{
if (buf == nullptr || buf->data_ == nullptr || buf->len_ < 1) {
if (buf == nullptr) {
return false;
}
if (buf->data_ == nullptr) {
buf->len_ = 0;
}
unsigned char header[] = {0xFF, 0xFE};
unsigned char tail[] = {0xFF, 0xFF};
len = buf->len_ + 11;
@ -79,7 +82,9 @@ bool CTransProtocal::pack(CFrameBuffer* buf, char** out_buf, int& len)
std::memcpy(*out_buf + 2, &buf->type_, 2);
std::memcpy(*out_buf + 2 + 2, &buf->mark_, 1);
std::memcpy(*out_buf + 2 + 2 + 1, &buf->len_, 4);
std::memcpy(*out_buf + 2 + 2 + 1 + 4, buf->data_, buf->len_);
if (buf->data_ != nullptr) {
std::memcpy(*out_buf + 2 + 2 + 1 + 4, buf->data_, buf->len_);
}
std::memcpy(*out_buf + len - 2, tail, 2);
return true;
}

View File

@ -6,7 +6,19 @@
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/spdlog.h>
constexpr auto gGet = "Get";
enum FrameType : int16_t {
TYPE_DEFAULT = 0,
TYPE_GET_LIST,
TYPE_DOWN_LIST,
TYPE_UP_LIST,
TYPE_CANCEL_LIST,
TYPE_OPEN_FILE,
TYPE_TRANS_FILE,
TYPE_READY_TRANS,
TYPE_INTERRUPT,
TYPE_NO_HIT_TASK,
TYPE_WAITTING
};
using namespace ofen;
std::shared_ptr<spdlog::logger> get_logger(const std::string& mark, const std::string& log_file);
@ -17,10 +29,11 @@ public:
~CFrameBuffer();
public:
std::string id_{};
std::string fid_{};
std::string tid_{};
public:
int16_t type_{};
FrameType type_{};
char* data_{};
int len_{};
char mark_{};