#include "server.h" CTcpServer::CTcpServer(asio::io_context& io_context, const std::shared_ptr& logger) : io_context_(io_context), logger_(logger), acceptor_(io_context) { th_run_ = true; handle_pool_ = std::make_shared(1); send_pool_ = std::make_shared(1); handle_pool_->init(); send_pool_->init(); handle_pool_->submit([&]() { handle_frame(); }); send_pool_->submit([&]() { send_simple_buf(); }); } CTcpServer::~CTcpServer() { th_run_ = false; handle_pool_->close_wait_all(); send_pool_->close_wait_all(); } bool CTcpServer::start(unsigned short port) { asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port); try { acceptor_.open(endpoint.protocol()); acceptor_.set_option(asio::socket_base::reuse_address(true)); acceptor_.bind(endpoint); acceptor_.listen(); } catch (const asio::system_error& e) { logger_->error("Failed to bind to {}: {}", endpoint.address().to_string(), e.what()); return false; } accept_client(); logger_->info("Server started on port {}", port); return true; } void CTcpServer::stop() { acceptor_.close(); std::lock_guard lock(cli_mut_); for (auto& [key, thread] : client_threads_) { if (thread.joinable()) { thread.join(); } } client_threads_.clear(); } std::vector> CTcpServer::get_clients() { std::vector> result; std::lock_guard lock(cli_mut_); for (const auto& item : client_map_) { result.push_back(std::make_pair(item.first, item.second->task_)); } return result; } SimpleBuffer* CTcpServer::get_client_list() { CFrameBuffer* buf = new CFrameBuffer(); buf->type_ = 199; auto vec = get_clients(); std::string msg; for (const auto& item : vec) { if (msg.empty()) { msg.append(item.first + "," + item.second); } else { msg.append("|" + item.first + "," + item.second); } } buf->data_ = new char[msg.size() + 1]; buf->len_ = static_cast(msg.size() + 1); std::snprintf(buf->data_, buf->len_, "%s", msg.data()); SimpleBuffer* sbuf = new SimpleBuffer(); if (!CTransProtocal::pack(buf, &sbuf->data_, sbuf->len_)) { logger_->error("{} pack faile.", __FUNCTION__); delete sbuf; return nullptr; } return sbuf; } bool CTcpServer::push_frame(CFrameBuffer* buf) { std::lock_guard lock(buf_mut_); cache_.push(buf); return true; } void CTcpServer::handle_frame() { CFrameBuffer* buf = nullptr; while (th_run_) { { std::lock_guard lock(buf_mut_); if (cache_.size() > 0) { buf = cache_.front(); cache_.pop(); } } if (!buf) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); continue; } // 拿到该包后,要看转发给谁或者处理 if (buf->type_ == 199) { // 询问在线客户端 auto* sbuf = get_client_list(); if (sbuf == nullptr) { continue; } sbuf->id_ = buf->id_; std::lock_guard lock(sbuf_mut_); scache_.push(sbuf); } delete buf; buf = nullptr; } } void CTcpServer::send_simple_buf() { SimpleBuffer* buf = nullptr; while (th_run_) { { std::lock_guard slock(sbuf_mut_); if (scache_.size() > 0) { buf = scache_.front(); scache_.pop(); } } if (!buf) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); continue; } std::shared_ptr socket = nullptr; { std::lock_guard clock(cli_mut_); if (!client_map_.count(buf->id_)) { logger_->warn("{} abandon {}'s data.", __FUNCTION__, buf->id_); delete buf; continue; } socket = client_map_[buf->id_]->socket_; } socket->send(asio::buffer(buf->data_, buf->len_)); delete buf; buf = nullptr; } } void CTcpServer::accept_client() { auto socket = std::make_shared(io_context_); acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) { if (!error) { auto endpoint = socket->remote_endpoint(); std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port()); logger_->info("New connection from {}", client_key); { std::lock_guard lock(cli_mut_); auto cache = std::make_shared(); cache->socket_ = socket; client_map_[client_key] = cache; } client_threads_[client_key] = std::thread(&CTcpServer::th_client, this, socket, client_key); } accept_client(); }); } void CTcpServer::th_client(std::shared_ptr socket, const std::string& client_key) { std::shared_ptr deleter(new int(0), [&](int* p) { std::lock_guard lock(cli_mut_); delete p; client_map_.erase(client_key); if (client_threads_.find(client_key) != client_threads_.end()) { client_threads_.at(client_key).detach(); client_threads_.erase(client_key); } logger_->warn("{} client {} exit.", __FUNCTION__, client_key); }); try { std::shared_ptr cache = nullptr; { std::lock_guard lock(cli_mut_); if (!client_map_.count(client_key)) { logger_->error("Not Find Client{} in cache.", client_key); return; } cache = client_map_[client_key]; } while (true) { asio::error_code error; size_t length = socket->read_some(asio::buffer(cache->tmp_buf_), error); if (error == asio::error::eof) { logger_->info("Connection closed by client: {}", client_key); break; } else if (error) { throw asio::system_error(error); } cache->buffer_.push(cache->tmp_buf_.data(), length); auto* frame = CTransProtocal::parse(cache->buffer_); if (frame) { frame->id_ = client_key; push_frame(frame); } } } catch (std::exception& e) { logger_->error("Error with client {}: {}", client_key, e.what()); } }