#include "net_base.h" CTcpServer::CTcpServer(asio::io_context& io_context, const std::shared_ptr& logger) : io_context_(io_context), logger_(logger), acceptor_(io_context) { } CTcpServer::~CTcpServer() { } bool CTcpServer::Start(unsigned short port) { asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port); try { acceptor_.open(endpoint.protocol()); acceptor_.set_option(asio::socket_base::reuse_address(true)); acceptor_.bind(endpoint); acceptor_.listen(); } catch (const asio::system_error& e) { logger_->error("Failed to bind to {}: {}", endpoint.address().to_string(), e.what()); return false; } Accept(); logger_->info("Server started on port {}", port); return true; } void CTcpServer::Stop() { acceptor_.close(); std::lock_guard lock(mutex_); for (auto& [key, thread] : client_threads_) { if (thread.joinable()) { thread.join(); } } client_threads_.clear(); } void CTcpServer::Accept() { auto socket = std::make_shared(io_context_); acceptor_.async_accept(*socket, [this, socket](const asio::error_code& error) { if (!error) { auto endpoint = socket->remote_endpoint(); std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port()); logger_->info("New connection from {}", client_key); { std::lock_guard lock(mutex_); client_map_[client_key] = 0; // Initial value as 0 } client_threads_[client_key] = std::thread(&CTcpServer::HandleClient, this, socket, client_key); } Accept(); }); } void CTcpServer::HandleClient(std::shared_ptr socket, const std::string& client_key) { try { char data[1024]; while (true) { asio::error_code error; size_t length = socket->read_some(asio::buffer(data), error); if (error == asio::error::eof) { logger_->info("Connection closed by client: {}", client_key); break; } else if (error) { throw asio::system_error(error); } auto relen = socket->send(asio::buffer(data, length)); logger_->info("Received data from {}, len={}, relen={}", client_key, length, relen); } } catch (std::exception& e) { logger_->error("Error with client {}: {}", client_key, e.what()); } std::lock_guard lock(mutex_); client_map_.erase(client_key); if (client_threads_.find(client_key) != client_threads_.end()) { client_threads_.at(client_key).detach(); client_threads_.erase(client_key); } } CTcpClient::CTcpClient(asio::io_context& io_context, const std::shared_ptr& logger) : logger_(logger), io_context_(io_context), socket_(io_context_) { } CTcpClient::~CTcpClient() { } bool CTcpClient::connect(const std::string& host, const std::string& port) { try { asio::ip::tcp::resolver resolver(io_context_); asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(host, port); asio::connect(socket_, endpoints); logger_->info("Connected to {}:{}", host, port); return true; } catch (const std::exception& ex) { logger_->error("Connection failed: {}", ex.what()); return false; } } void CTcpClient::disconnect() { if (socket_.is_open()) { try { socket_.close(); logger_->info("Disconnected."); } catch (const std::exception& ex) { logger_->error("Error during disconnection: {}", ex.what()); } } } bool CTcpClient::send(const char* data, int len) { try { auto send_size = asio::write(socket_, asio::buffer(data, len)); logger_->info("Need Send len: {} Real Send len: {}", len, send_size); return static_cast(send_size) == len; } catch (const std::exception& ex) { logger_->error("Send failed: {}", ex.what()); return false; } } void CTcpClient::register_func(ExFun_t&& f) { fun_ = f; } void CTcpClient::async_recv() { auto self(shared_from_this()); socket_.async_read_some(asio::buffer(tmp_buf_), [this, self](std::error_code ec, std::size_t length) { logger_->debug("{} {}", __FUNCTION__, ec.message()); if (!ec) { std::lock_guard lock(mutex_); buffer_.push(tmp_buf_.data(), length); auto* frame = CTransProtocal::parse(buffer_); if (frame) { if (fun_) { fun_(frame); } delete frame; } async_recv(); } }); }