From c7b15694b90dbfecfe07f3f3c1537506a90820d3 Mon Sep 17 00:00:00 2001 From: taynpg Date: Wed, 25 Jun 2025 17:06:30 +0800 Subject: [PATCH] heartbeat: add basic code. --- CMakeLists.txt | 2 +- ClientCore/ClientCore.cpp | 62 ++++++++++++++++++++++++++++++++++++++- ClientCore/ClientCore.h | 42 ++++++++++++++++++++++++++ Protocol/Protocol.h | 3 ++ Server/Server.cpp | 33 +++++++++++++++++++-- 5 files changed, 138 insertions(+), 4 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e28fd6c..b5e8f80 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.16) -project(frelay VERSION 0.1.4 LANGUAGES CXX) +project(frelay VERSION 0.1.5 LANGUAGES CXX) set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD_REQUIRED ON) diff --git a/ClientCore/ClientCore.cpp b/ClientCore/ClientCore.cpp index c9fa676..ecd3f7b 100644 --- a/ClientCore/ClientCore.cpp +++ b/ClientCore/ClientCore.cpp @@ -209,6 +209,11 @@ bool ClientCore::Send(const char* data, qint64 len) return true; } +bool ClientCore::IsConnect() +{ + return connected_; +} + void ClientCore::SetRemoteID(const QString& id) { GlobalData::Ins()->SetRemoteID(id); @@ -238,7 +243,62 @@ SocketWorker::~SocketWorker() void SocketWorker::run() { - // qDebug() << "SocketWorker thread:" << QThread::currentThread(); core_->Instance(); exec(); } + +HeatBeat::HeatBeat(ClientCore* core, QObject* parent) : QThread(parent), core_(core) +{ +} + +HeatBeat::~HeatBeat() +{ + Stop(); +} + +void HeatBeat::Stop() +{ + isRun_ = false; +} + +void HeatBeat::run() +{ + isRun_ = true; + InfoMsg info; + auto frame = core_->GetBuffer(info, FBT_SER_MSG_HEARTBEAT, ""); + while (isRun_) { + QThread::sleep(1); + if (!core_->IsConnect()) { + continue; + } + ClientCore::syncInvoke(core_, frame); + } +} + +TransBeat::TransBeat(ClientCore* core, QObject* parent) : QThread(parent), core_(core) +{ +} + +TransBeat::~TransBeat() +{ + Stop(); +} + +void TransBeat::run() +{ + isRun_ = true; + InfoMsg info; + while (isRun_) { + QThread::sleep(1); + if (!core_->IsConnect()) { + continue; + } + auto frame = core_->GetBuffer(info, FBT_SER_MSG_JUDGE_OTHER_ALIVE, core_->GetRemoteID()); + ClientCore::syncInvoke(core_, frame); + } +} + +void TransBeat::Stop() +{ + isRun_ = false; +} diff --git a/ClientCore/ClientCore.h b/ClientCore/ClientCore.h index 5040a35..6e6d72e 100644 --- a/ClientCore/ClientCore.h +++ b/ClientCore/ClientCore.h @@ -34,6 +34,7 @@ public: void Disconnect(); bool Send(QSharedPointer frame); bool Send(const char* data, qint64 len); + bool IsConnect(); template bool Send(const T& info, FrameBufferType type, const QString& tid) { auto f = GetBuffer(info, type, tid); @@ -104,6 +105,7 @@ public: LocalFile localFile_; }; +// Socket Worker Thread class SocketWorker : public QThread { Q_OBJECT @@ -119,4 +121,44 @@ private: ClientCore* core_{}; }; +// HeatBeat to Server +class HeatBeat : public QThread +{ + Q_OBJECT + +public: + HeatBeat(ClientCore* core, QObject* parent = nullptr); + ~HeatBeat() override; + +public: + void Stop(); + +protected: + void run() override; + +private: + bool isRun_{false}; + ClientCore* core_{}; +}; + +// judge send client is alive or not when downloading +class TransBeat : public QThread +{ + Q_OBJECT + +public: + TransBeat(ClientCore* core, QObject* parent = nullptr); + ~TransBeat() override; + +public: + void Stop(); + +protected: + void run() override; + +private: + bool isRun_{false}; + ClientCore* core_{}; +}; + #endif // CLIENTCORE_H \ No newline at end of file diff --git a/Protocol/Protocol.h b/Protocol/Protocol.h index db3bca4..d52d8f5 100644 --- a/Protocol/Protocol.h +++ b/Protocol/Protocol.h @@ -14,10 +14,13 @@ constexpr quint32 CHUNK_BUF_SIZE = 1 * 1024 * 1024; // Contents beyond 30 are only forwarded. enum FrameBufferType : uint16_t { FBT_NONE = 0, + FBT_SER_MSG_HEARTBEAT, FBT_SER_MSG_ASKCLIENTS, FBT_SER_MSG_YOURID, FBT_SER_MSG_RESPONSE, FBT_SER_MSG_FORWARD_FAILED, + FBT_SER_MSG_JUDGE_OTHER_ALIVE, + FBT_SER_MSG_OFFLINE, FBT_CLI_BIN_FILEDATA = 31, FBT_CLI_MSG_COMMUNICATE, FBT_CLI_ASK_DIRFILE, diff --git a/Server/Server.cpp b/Server/Server.cpp index 1fd75b0..8c0f6a0 100644 --- a/Server/Server.cpp +++ b/Server/Server.cpp @@ -4,6 +4,7 @@ #include #include "InfoClient.h" +#include "InfoMsg.h" #include "InfoPack.hpp" Server::Server(QObject* parent) : QTcpServer(parent) @@ -174,7 +175,36 @@ void Server::replyRequest(QSharedPointer client, QSharedPointertid = client->id; replyFrame->data = clientList; auto ret = sendData(client->socket, replyFrame); - qDebug() << "Reply client list:" << client->id << ret; + break; + } + case FBT_SER_MSG_HEARTBEAT: { + QSharedPointer cl; + { + QReadLocker locker(&rwLock_); + if (clients_.count(frame->fid)) { + cl = clients_.value(frame->fid); + } + } + if (cl) { + cl->connectTime = QDateTime::currentDateTime().toMSecsSinceEpoch() / 1000; + } + break; + } + case FBT_SER_MSG_JUDGE_OTHER_ALIVE: { + QSharedPointer cl; + { + QReadLocker locker(&rwLock_); + if (clients_.count(frame->tid)) { + cl = clients_.value(frame->tid); + } + } + if (!cl) { + auto rf = QSharedPointer::create(); + rf->type = FBT_SER_MSG_OFFLINE; + rf->fid = id_; + rf->tid = frame->fid; + sendData(client->socket, rf); + } break; } default: @@ -199,7 +229,6 @@ bool Server::sendData(QTcpSocket* socket, QSharedPointer frame) void Server::monitorClients() { - // qint64 now = QDateTime::currentSecsSinceEpoch(); qint64 now = QDateTime::currentDateTime().toMSecsSinceEpoch() / 1000; QWriteLocker locker(&rwLock_);