transm/server/server.cpp

255 lines
7.8 KiB
C++

#include "server.h"
#include <of_str.h>
using namespace ofen;
constexpr int g_ParseThreadNum = 1;
constexpr int g_SendThreadNum = 1;
CTcpServer::CTcpServer(asio::io_context& io_context, const std::shared_ptr<spdlog::logger>& logger)
: io_context_(io_context), logger_(logger), acceptor_(io_context)
{
th_run_ = true;
handle_pool_ = std::make_shared<CThreadPool>(g_ParseThreadNum);
send_pool_ = std::make_shared<CThreadPool>(g_SendThreadNum);
handle_pool_->init();
send_pool_->init();
for (int i = 0; i < g_ParseThreadNum; ++i) {
handle_pool_->submit([&]() { handle_frame(); });
}
for (int i = 0; i < g_ParseThreadNum; ++i) {
send_pool_->submit([&]() { send_simple_buf(); });
}
}
CTcpServer::~CTcpServer()
{
th_run_ = false;
handle_pool_->close_wait_all();
send_pool_->close_wait_all();
}
bool CTcpServer::start(unsigned short port)
{
asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port);
try {
acceptor_.open(endpoint.protocol());
acceptor_.set_option(asio::socket_base::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
} catch (const asio::system_error& e) {
logger_->error("Failed to bind to {}: {}", endpoint.address().to_string(), e.what());
return false;
}
accept_client();
logger_->info("Server started on port {}", port);
return true;
}
void CTcpServer::stop()
{
acceptor_.close();
std::lock_guard<std::mutex> lock(cli_mut_);
for (auto& [key, thread] : client_threads_) {
if (thread.joinable()) {
thread.join();
}
}
client_threads_.clear();
}
std::vector<TaskList> CTcpServer::get_clients()
{
std::vector<TaskList> result;
std::lock_guard<std::mutex> lock(cli_mut_);
for (const auto& item : client_map_) {
TaskList t;
t.id_ = item.first;
t.task_ = item.second->task_;
t.time_ = item.second->time_;
result.push_back(t);
}
return result;
}
SimpleBuffer* CTcpServer::get_client_list()
{
CFrameBuffer* buf = new CFrameBuffer();
buf->type_ = 199;
auto vec = get_clients();
std::string msg;
int index = 1;
for (const auto& item : vec) {
msg.append(fmt::format("[{}][{}][{}]", index, item.id_, item.time_));
auto files = COfStr::split(item.task_, "|");
for (const auto& file : files) {
msg.append("\n" + file);
}
}
buf->data_ = new char[msg.size() + 1];
buf->len_ = static_cast<int>(msg.size() + 1);
std::snprintf(buf->data_, buf->len_, "%s", msg.data());
SimpleBuffer* sbuf = new SimpleBuffer();
if (!CTransProtocal::pack(buf, &sbuf->data_, sbuf->len_)) {
logger_->error("{} pack faile.", __FUNCTION__);
delete sbuf;
return nullptr;
}
return sbuf;
}
bool CTcpServer::push_frame(CFrameBuffer* buf)
{
std::lock_guard<std::mutex> lock(buf_mut_);
cache_.push(buf);
return true;
}
void CTcpServer::handle_frame()
{
CFrameBuffer* buf = nullptr;
while (th_run_) {
{
std::lock_guard<std::mutex> lock(buf_mut_);
if (cache_.size() > 0) {
buf = cache_.front();
cache_.pop();
}
}
if (!buf) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}
// 拿到该包后,要看转发给谁或者处理
if (buf->type_ == 199) { // 询问在线客户端
logger_->info("GetList.");
auto* sbuf = get_client_list();
if (sbuf == nullptr) {
continue;
}
sbuf->id_ = buf->id_;
std::lock_guard<std::mutex> lock(sbuf_mut_);
scache_.push(sbuf);
} else if (buf->type_ == 197) {
logger_->info("UpList. {}", std::string(buf->data_, buf->len_));
std::lock_guard<std::mutex> lock(cli_mut_);
if (client_map_.count(buf->id_)) {
auto& cli = client_map_[buf->id_];
cli->task_ = std::string(buf->data_, buf->len_);
cli->time_ = OfUtil::now_time();
}
} else if (buf->type_ == 196) {
logger_->info("Cancle Task.");
std::lock_guard<std::mutex> lock(cli_mut_);
if (client_map_.count(buf->id_)) {
auto& cli = client_map_[buf->id_];
cli->task_.clear();
}
}
delete buf;
buf = nullptr;
}
}
void CTcpServer::send_simple_buf()
{
SimpleBuffer* buf = nullptr;
while (th_run_) {
{
std::lock_guard<std::mutex> slock(sbuf_mut_);
if (scache_.size() > 0) {
buf = scache_.front();
scache_.pop();
}
}
if (!buf) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}
std::shared_ptr<asio::ip::tcp::socket> socket = nullptr;
{
std::lock_guard<std::mutex> clock(cli_mut_);
if (!client_map_.count(buf->id_)) {
logger_->warn("{} abandon {}'s data.", __FUNCTION__, buf->id_);
delete buf;
continue;
}
socket = client_map_[buf->id_]->socket_;
}
socket->send(asio::buffer(buf->data_, buf->len_));
delete buf;
buf = nullptr;
}
}
void CTcpServer::accept_client()
{
auto socket = std::make_shared<asio::ip::tcp::socket>(io_context_);
acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) {
if (!error) {
auto endpoint = socket->remote_endpoint();
std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port());
logger_->info("New connection from {}", client_key);
{
std::lock_guard<std::mutex> lock(cli_mut_);
auto cache = std::make_shared<ClientCache>();
cache->socket_ = socket;
client_map_[client_key] = cache;
}
client_threads_[client_key] = std::thread(&CTcpServer::th_client, this, socket, client_key);
}
accept_client();
});
}
void CTcpServer::th_client(std::shared_ptr<asio::ip::tcp::socket> socket, const std::string& client_key)
{
std::shared_ptr<int> deleter(new int(0), [&](int* p) {
std::lock_guard<std::mutex> lock(cli_mut_);
delete p;
client_map_.erase(client_key);
if (client_threads_.find(client_key) != client_threads_.end()) {
client_threads_.at(client_key).detach();
client_threads_.erase(client_key);
}
logger_->warn("{} client {} exit.", __FUNCTION__, client_key);
});
try {
std::shared_ptr<ClientCache> cache = nullptr;
{
std::lock_guard<std::mutex> lock(cli_mut_);
if (!client_map_.count(client_key)) {
logger_->error("Not Find Client{} in cache.", client_key);
return;
}
cache = client_map_[client_key];
}
while (true) {
asio::error_code error;
size_t length = socket->read_some(asio::buffer(cache->tmp_buf_), error);
if (error == asio::error::eof) {
logger_->info("Connection closed by client: {}", client_key);
break;
} else if (error) {
throw asio::system_error(error);
}
cache->buffer_.push(cache->tmp_buf_.data(), length);
auto* frame = CTransProtocal::parse(cache->buffer_);
if (frame) {
frame->id_ = client_key;
push_frame(frame);
}
}
} catch (std::exception& e) {
logger_->error("Error with client {}: {}", client_key, e.what());
}
}