diff --git a/Information/InfoCommunicate.hpp b/Information/InfoCommunicate.hpp new file mode 100644 index 0000000..09d6737 --- /dev/null +++ b/Information/InfoCommunicate.hpp @@ -0,0 +1,24 @@ +#ifndef INFOCOMMUNICATE_HPP +#define INFOCOMMUNICATE_HPP + +#include +#include +#include + +enum MessageType { + MSG_TYPE_ASK_CLIENTS = 1, +}; + +struct InfoCommunicate { + MessageType type; + std::string toID; + std::string UUID; + std::string data; + char mark{}; + template void serialize(Archive& archive) + { + archive(CEREAL_NVP(type), CEREAL_NVP(toID), CEREAL_NVP(UUID), CEREAL_NVP(data), CEREAL_NVP(mark)); + } +}; + +#endif // INFOCOMMUNICATE_HPP \ No newline at end of file diff --git a/Protocol/CMakeLists.txt b/Protocol/CMakeLists.txt index ee9c9e6..d10b21e 100644 --- a/Protocol/CMakeLists.txt +++ b/Protocol/CMakeLists.txt @@ -12,4 +12,5 @@ Communicate.cxx ) add_library(Protocol STATIC ${MSOURCES}) -target_link_libraries(Protocol PRIVATE wx::base wx::core) \ No newline at end of file +target_link_libraries(Protocol PRIVATE wx::base wx::core Util) +target_include_directories(Protocol PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) \ No newline at end of file diff --git a/Protocol/Communicate.cxx b/Protocol/Communicate.cxx index d6c8b7e..e329a97 100644 --- a/Protocol/Communicate.cxx +++ b/Protocol/Communicate.cxx @@ -1,5 +1,76 @@ #include "Communicate.h" +constexpr unsigned char gHeader[] = {0xFF, 0xFE}; +constexpr unsigned char gTail[] = {0xFF, 0xFF}; + Communicate::Communicate() { -} \ No newline at end of file +} + +/* +【 transm TCP 数据协议 】 + header 2 char: 0xFF 0xFE + from 32 char: + to 32 char: + len 4 char: + data xxxxx: + tail 2 char: 0xFF 0xFF + */ +FrameBuffer* Communicate::ParseBuffer(MutBuffer& buffer) +{ + FrameBuffer* frame = nullptr; + + int find = buffer.IndexOf((const char*)gHeader, sizeof(gHeader)); + if (find < 0) { + return frame; + } + + int len = 0; + std::memcpy(&len, buffer.GetData() + find + sizeof(gHeader) + 64, sizeof(len)); + if (buffer.Length() < (find + sizeof(gHeader) + 64 + len + sizeof(len) + sizeof(gTail)) || len < 0) { + return frame; + } + + if (std::memcmp(buffer.GetData() + find + sizeof(gHeader) + 64 + sizeof(len) + len, gTail, sizeof(gTail)) != 0) { + return frame; + } + + frame = new FrameBuffer(); + frame->fid = std::string(buffer.GetData() + find + sizeof(gHeader), 32); + frame->tid = std::string(buffer.GetData() + find + sizeof(gHeader) + 32, 32); + + if (len > 0) { + frame->data = new char[len]; + std::memcpy(frame->data, buffer.GetData() + find + sizeof(gHeader) + 64 + sizeof(len), len); + frame->len = len; + } + buffer.RemoveOf(0, find + sizeof(gHeader) + 64 + len + sizeof(len) + sizeof(gTail)); + return frame; +} + +bool Communicate::PackBuffer(FrameBuffer* frame, char** buf, int& len) +{ + if (frame == nullptr) { + return false; + } + if (frame->data == nullptr) { + frame->len = 0; + } + len = sizeof(gHeader) + 64 + sizeof(len) + frame->len + sizeof(gTail); + *buf = new char[len]; + std::memcpy(*buf, gHeader, sizeof(gHeader)); + std::memcpy(*buf + sizeof(gHeader), frame->fid.c_str(), 32); + std::memcpy(*buf + sizeof(gHeader) + 32, frame->tid.c_str(), 32); + std::memcpy(*buf + sizeof(gHeader) + 64, &frame->len, sizeof(len)); + if (frame->len > 0) { + std::memcpy(*buf + sizeof(gHeader) + 64 + sizeof(len), frame->data, frame->len); + } + std::memcpy(*buf + sizeof(gHeader) + 64 + sizeof(len) + frame->len, gTail, sizeof(gTail)); + return true; +} + +FrameBuffer::~FrameBuffer() +{ + delete[] data; + len = 0; +} diff --git a/Protocol/Communicate.h b/Protocol/Communicate.h index 6605e2a..65c4d48 100644 --- a/Protocol/Communicate.h +++ b/Protocol/Communicate.h @@ -1,10 +1,24 @@ #ifndef COMMUNICATE_H #define COMMUNICATE_H +#include + +struct FrameBuffer { + ~FrameBuffer(); + std::string fid; + std::string tid; + char* data{}; + int len{}; +}; + class Communicate { public: Communicate(); + +public: + static FrameBuffer* ParseBuffer(MutBuffer& buffer); + static bool PackBuffer(FrameBuffer* frame, char** buf, int& len); }; #endif // COMMUNICATE_H \ No newline at end of file diff --git a/RelayServer/CMakeLists.txt b/RelayServer/CMakeLists.txt index bcfa1a8..a9aa809 100644 --- a/RelayServer/CMakeLists.txt +++ b/RelayServer/CMakeLists.txt @@ -13,5 +13,5 @@ main.cxx ) add_executable(RelayServer ${MSOURCES}) -target_link_libraries(RelayServer PRIVATE wx::base wx::core Util) -set_target_properties(RelayServer PROPERTIES WIN32_EXECUTABLE TRUE) \ No newline at end of file +target_link_libraries(RelayServer PRIVATE wx::base wx::core Util Protocol wx::net) +# set_target_properties(RelayServer PROPERTIES WIN32_EXECUTABLE TRUE) \ No newline at end of file diff --git a/RelayServer/RelayServer.cxx b/RelayServer/RelayServer.cxx index 10bb962..5c87204 100644 --- a/RelayServer/RelayServer.cxx +++ b/RelayServer/RelayServer.cxx @@ -1,4 +1,6 @@ #include "RelayServer.h" +#include +#include RemoteServer::RemoteServer() { @@ -6,18 +8,121 @@ RemoteServer::RemoteServer() bool RemoteServer::Init(const wxString& ip, unsigned short port) { - return false; + thRun_= true; + wxIPV4address addr; + + if (!addr.Hostname(ip)) { + wxLogError(wxT("Invalid IP address: %s"), ip); + return false; + } + + addr.Service(port); + server_ = std::make_unique(addr); + if (!server_->IsOk()) { + wxLogError(wxT("Failed to create server socket.")); + return false; + } + if (!server_->GetLocal(addr)) { + wxLogError(wxT("Failed to get local address.")); + return false; + } + wxLogMessage(wxT("Server socket created on %s:%d"), addr.IPAddress(), addr.Service()); + //wxLogInfo(wxT("Server socket created on %s:%d"), addr.IPAddress(), addr.Service()); + + serverId_ = wxNewId(); + server_->SetFlags(wxSOCKET_WAITALL); + server_->SetEventHandler(*this, serverId_); + + server_->SetNotify(wxSOCKET_CONNECTION_FLAG | wxSOCKET_LOST_FLAG); + server_->Notify(true); + Bind(wxEVT_SOCKET, &RemoteServer::OnServerEvent, this, serverId_); + + return true; } int RemoteServer::Run() { - return 0; + wxEventLoop loop; + return loop.Run(); } void RemoteServer::OnServerEvent(wxSocketEvent& event) { + auto* sock = event.GetSocket(); + switch (event.GetSocketEvent()) { + case wxSOCKET_CONNECTION: { + auto newer = std::shared_ptr(server_->Accept(false)); + if (!newer) { + wxLogError(wxT("Failed to accept client connection.")); + return; + } + wxIPV4address addr; + newer->GetPeer(addr); + wxString id = wxString::Format("%s:%d", addr.IPAddress(), addr.Service()); + wxLogMessage(wxT("Client connected: %s"), id); + + std::unique_lock lock(clientsMutex_); + auto client = std::make_shared(); + client->wxSock = newer; + client->onlineTime = + std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + client->lastRecvTime = std::chrono::high_resolution_clock::now(); + clients_[id] = client; + threads_[id] = std::thread(&RemoteServer::thClientThread, this, newer, id); + break; + } + case wxSOCKET_LOST: { + wxIPV4address addr; + sock->GetPeer(addr); + wxString id = wxString::Format("%s:%d", addr.IPAddress(), addr.Service()); + wxLogMessage(wxT("Client disconnected: %s"), id); + std::unique_lock lock(clientsMutex_); + if (clients_.find(id) != clients_.end()) { + clients_.erase(id); + } + if (threads_.find(id) != threads_.end()) { + threads_[id].detach(); + threads_.erase(id); + } + break; + } + default: + break; + } } void RemoteServer::thClientThread(const std::shared_ptr& wxSock, const wxString& id) { + wxLogMessage(wxT("Client thread started: %s"), id); + std::shared_ptr client = nullptr; + + { + std::shared_lock lock(clientsMutex_); + client = clients_[id]; + } + + InfoCommunicate info; + while (thRun_) { + wxSock->Read(client->buf.data(), gBufferSize); + auto br = wxSock->LastCount(); + if (br == 0) { + wxLogMessage(wxT("Client disconnected: %s"), id); + break; + } else if (wxSock->Error()) { + wxLogMessage(wxT("%s Client error: %s"), id, wxSock->LastError()); + break; + } + client->buffer.Push(client->buf.data(), br); + while (true) { + auto* frame = Communicate::ParseBuffer(client->buffer); + if (!frame) { + break; + } + std::stringstream ss; + ss.write(frame->data, frame->len); + cereal::BinaryInputArchive inputArchive(ss); + inputArchive(info); + delete frame; + } + } } diff --git a/RelayServer/RelayServer.h b/RelayServer/RelayServer.h index eb8257a..b85a598 100644 --- a/RelayServer/RelayServer.h +++ b/RelayServer/RelayServer.h @@ -14,9 +14,12 @@ #include #include +// constexpr int gBufferSize = 1024 * 1024; +constexpr int gBufferSize = 256; using highClock_t = std::chrono::time_point; struct TranClient { std::shared_ptr wxSock; + std::array buf; MutBuffer buffer; int64_t onlineTime; highClock_t lastRecvTime; @@ -36,6 +39,7 @@ private: void thClientThread(const std::shared_ptr& wxSock, const wxString& id); private: + bool thRun_{false}; wxWindowID serverId_; std::shared_mutex clientsMutex_; std::unique_ptr server_; diff --git a/RelayServer/main.cxx b/RelayServer/main.cxx index fdafde3..6b93c40 100644 --- a/RelayServer/main.cxx +++ b/RelayServer/main.cxx @@ -1,14 +1,15 @@ #include "RelayServer.h" -class RelayServerApp : public wxApp +int main() { -public: - bool OnInit() override; -}; - -bool RelayServerApp::OnInit() -{ - return true; -} - -wxIMPLEMENT_APP(RelayServerApp); \ No newline at end of file + wxInitializer initializer; + if (!initializer.IsOk()) { + fprintf(stderr, "Failed to initialize the wxWidgets library, aborting."); + return -1; + } + auto server_ = std::make_unique(); + if (server_->Init("127.0.0.1", 8080)) { + server_->Run(); + } + return 0; +} \ No newline at end of file