#include "RelayServer.h" #include RelayServer::RelayServer() { } bool RelayServer::Init(const wxString& ip, unsigned short port) { thRun_ = true; wxIPV4address addr; if (!addr.Hostname(ip)) { wxLogError(wxT("Invalid IP address: %s"), ip); return false; } addr.Service(port); server_ = std::make_unique(addr); if (!server_->IsOk()) { wxLogError(wxT("Failed to create server socket.")); return false; } if (!server_->GetLocal(addr)) { wxLogError(wxT("Failed to get local address.")); return false; } strID_ = wxString::Format("%s:%d", addr.IPAddress(), addr.Service()); wxLogMessage(wxT("Server socket created on %s:%d"), addr.IPAddress(), addr.Service()); // wxLogInfo(wxT("Server socket created on %s:%d"), addr.IPAddress(), addr.Service()); serverId_ = wxNewId(); // server_->SetFlags(wxSOCKET_NOWAIT); server_->SetEventHandler(*this, serverId_); server_->SetNotify(wxSOCKET_CONNECTION_FLAG | wxSOCKET_LOST_FLAG); server_->Notify(true); Bind(wxEVT_SOCKET, &RelayServer::OnServerEvent, this, serverId_); return true; } int RelayServer::Run() { wxEventLoop loop; return loop.Run(); } void RelayServer::OnServerEvent(wxSocketEvent& event) { auto* sock = event.GetSocket(); switch (event.GetSocketEvent()) { case wxSOCKET_CONNECTION: { auto newer = std::shared_ptr(server_->Accept(false)); if (!newer) { wxLogError(wxT("Failed to accept client connection.")); return; } wxIPV4address addr; newer->GetPeer(addr); wxString id = wxString::Format("%s:%d", addr.IPAddress(), addr.Service()); wxLogMessage(wxT("Client connected: %s"), id); std::unique_lock lock(clientsMutex_); auto client = std::make_shared(); client->wxSock = newer; client->onlineTime = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); client->lastRecvTime = std::chrono::high_resolution_clock::now(); clients_[id] = client; threads_[id] = std::thread(&RelayServer::thClientThread, this, newer, id); break; } case wxSOCKET_LOST: { wxIPV4address addr; sock->GetPeer(addr); wxString id = wxString::Format("%s:%d", addr.IPAddress(), addr.Service()); wxLogMessage(wxT("disconnected: %s"), id); ClearClient(id); break; } default: break; } } void RelayServer::thClientThread(const std::shared_ptr& wxSock, const wxString& id) { wxLogMessage(wxT("Client thread started: %s"), id); std::shared_ptr client = nullptr; { std::shared_lock lock(clientsMutex_); client = clients_[id]; } // 初次链接,告知对方的ID InfoCommunicate info; info.data = id.ToStdString(); Send(client->wxSock, info, FBT_SER_MSG_YOURID, id.ToStdString(), strID_); client->wxSock->SetFlags(wxSOCKET_BLOCK); while (thRun_) { wxSock->Read(client->buf.data(), GBUFFER_SIZE); auto br = wxSock->LastCount(); if (br == 0) { wxLogMessage(wxT("Client disconnected: %s"), id); ClearClient(id); break; } else if (wxSock->Error()) { wxLogMessage(wxT("%s Client error: %s"), id, wxSock->LastError()); ClearClient(id); break; } client->buffer.Push(client->buf.data(), br); while (true) { auto* frame = Communicate::ParseBuffer(client->buffer); if (!frame) { break; } frame->fid = id.ToStdString(); if (frame->dataType <= 30) { frame->tid = strID_; ReplyRequest(client->wxSock, frame); delete frame; continue; } Forword(client->wxSock, frame); delete frame; } } } bool RelayServer::Forword(const sockPtr& wxSock, FrameBuffer* buf) { std::shared_ptr fcl = nullptr; std::shared_ptr tcl = nullptr; { std::shared_lock lock(clientsMutex_); if (clients_.count(buf->fid)) { fcl = clients_[buf->fid]; } if (clients_.count(buf->tid)) { tcl = clients_[buf->tid]; } } bool farward = false; if (tcl) { farward = Send(tcl->wxSock, buf); } if (farward) { return true; } if (fcl) { RpyForwordFailed(fcl->wxSock, buf); } return farward; } void RelayServer::ReplyRequest(const sockPtr& wxSock, FrameBuffer* frame) { switch (frame->dataType) { case FBT_SER_MSG_ASKCLIENTS: { RpyOnline(wxSock, frame); wxLogDebug(wxT("Reply clients to %s clients list."), frame->fid); break; } default: break; } } void RelayServer::ClearClient(const wxString& id) { std::unique_lock lock(clientsMutex_); if (clients_.find(id) != clients_.end()) { wxLogDebug(wxT("client cleared: %s"), id); clients_.erase(id); } if (threads_.find(id) != threads_.end()) { wxLogDebug(wxT("client thread detached: %s"), id); threads_[id].detach(); threads_.erase(id); } } bool RelayServer::RpyOnline(const sockPtr& wxSock, FrameBuffer* frame) { InfoClientVec infoClients; { std::shared_lock lock(clientsMutex_); for (const auto& client : clients_) { InfoClient infoClient; infoClient.id = client.first; infoClient.name = client.second->name; infoClients.vec.push_back(infoClient); } } std::stringstream ss; cereal::BinaryOutputArchive archive(ss); archive(infoClients); return Send(wxSock, infoClients, FBT_SER_MSG_ASKCLIENTS, frame->fid, frame->tid); } bool RelayServer::RpyForwordFailed(const sockPtr& wxSock, FrameBuffer* frame) { InfoCommunicate info; return Send(wxSock, info, FBT_SER_MSG_RESPONSE, frame->fid, frame->tid); } bool RelayServer::Send(const sockPtr& wxSock, FrameBuffer* buf) { if (buf == nullptr) { return false; } char* od = nullptr; int odLen = 0; if (!Communicate::PackBuffer(buf, &od, odLen)) { return false; } wxSock->Write(od, odLen); if (wxSock->Error()) { delete[] od; wxLogError(wxT("Send error: %s"), wxSock->LastError()); return false; } delete[] od; return true; }