heartbeat: add basic code.

This commit is contained in:
2025-06-25 17:06:30 +08:00
parent 06bf31e8bd
commit c7b15694b9
5 changed files with 138 additions and 4 deletions

View File

@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.16) cmake_minimum_required(VERSION 3.16)
project(frelay VERSION 0.1.4 LANGUAGES CXX) project(frelay VERSION 0.1.5 LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_STANDARD_REQUIRED ON)

View File

@@ -209,6 +209,11 @@ bool ClientCore::Send(const char* data, qint64 len)
return true; return true;
} }
bool ClientCore::IsConnect()
{
return connected_;
}
void ClientCore::SetRemoteID(const QString& id) void ClientCore::SetRemoteID(const QString& id)
{ {
GlobalData::Ins()->SetRemoteID(id); GlobalData::Ins()->SetRemoteID(id);
@@ -238,7 +243,62 @@ SocketWorker::~SocketWorker()
void SocketWorker::run() void SocketWorker::run()
{ {
// qDebug() << "SocketWorker thread:" << QThread::currentThread();
core_->Instance(); core_->Instance();
exec(); exec();
} }
HeatBeat::HeatBeat(ClientCore* core, QObject* parent) : QThread(parent), core_(core)
{
}
HeatBeat::~HeatBeat()
{
Stop();
}
void HeatBeat::Stop()
{
isRun_ = false;
}
void HeatBeat::run()
{
isRun_ = true;
InfoMsg info;
auto frame = core_->GetBuffer(info, FBT_SER_MSG_HEARTBEAT, "");
while (isRun_) {
QThread::sleep(1);
if (!core_->IsConnect()) {
continue;
}
ClientCore::syncInvoke(core_, frame);
}
}
TransBeat::TransBeat(ClientCore* core, QObject* parent) : QThread(parent), core_(core)
{
}
TransBeat::~TransBeat()
{
Stop();
}
void TransBeat::run()
{
isRun_ = true;
InfoMsg info;
while (isRun_) {
QThread::sleep(1);
if (!core_->IsConnect()) {
continue;
}
auto frame = core_->GetBuffer(info, FBT_SER_MSG_JUDGE_OTHER_ALIVE, core_->GetRemoteID());
ClientCore::syncInvoke(core_, frame);
}
}
void TransBeat::Stop()
{
isRun_ = false;
}

View File

@@ -34,6 +34,7 @@ public:
void Disconnect(); void Disconnect();
bool Send(QSharedPointer<FrameBuffer> frame); bool Send(QSharedPointer<FrameBuffer> frame);
bool Send(const char* data, qint64 len); bool Send(const char* data, qint64 len);
bool IsConnect();
template <typename T> bool Send(const T& info, FrameBufferType type, const QString& tid) template <typename T> bool Send(const T& info, FrameBufferType type, const QString& tid)
{ {
auto f = GetBuffer<T>(info, type, tid); auto f = GetBuffer<T>(info, type, tid);
@@ -104,6 +105,7 @@ public:
LocalFile localFile_; LocalFile localFile_;
}; };
// Socket Worker Thread
class SocketWorker : public QThread class SocketWorker : public QThread
{ {
Q_OBJECT Q_OBJECT
@@ -119,4 +121,44 @@ private:
ClientCore* core_{}; ClientCore* core_{};
}; };
// HeatBeat to Server
class HeatBeat : public QThread
{
Q_OBJECT
public:
HeatBeat(ClientCore* core, QObject* parent = nullptr);
~HeatBeat() override;
public:
void Stop();
protected:
void run() override;
private:
bool isRun_{false};
ClientCore* core_{};
};
// judge send client is alive or not when downloading
class TransBeat : public QThread
{
Q_OBJECT
public:
TransBeat(ClientCore* core, QObject* parent = nullptr);
~TransBeat() override;
public:
void Stop();
protected:
void run() override;
private:
bool isRun_{false};
ClientCore* core_{};
};
#endif // CLIENTCORE_H #endif // CLIENTCORE_H

View File

@@ -14,10 +14,13 @@ constexpr quint32 CHUNK_BUF_SIZE = 1 * 1024 * 1024;
// Contents beyond 30 are only forwarded. // Contents beyond 30 are only forwarded.
enum FrameBufferType : uint16_t { enum FrameBufferType : uint16_t {
FBT_NONE = 0, FBT_NONE = 0,
FBT_SER_MSG_HEARTBEAT,
FBT_SER_MSG_ASKCLIENTS, FBT_SER_MSG_ASKCLIENTS,
FBT_SER_MSG_YOURID, FBT_SER_MSG_YOURID,
FBT_SER_MSG_RESPONSE, FBT_SER_MSG_RESPONSE,
FBT_SER_MSG_FORWARD_FAILED, FBT_SER_MSG_FORWARD_FAILED,
FBT_SER_MSG_JUDGE_OTHER_ALIVE,
FBT_SER_MSG_OFFLINE,
FBT_CLI_BIN_FILEDATA = 31, FBT_CLI_BIN_FILEDATA = 31,
FBT_CLI_MSG_COMMUNICATE, FBT_CLI_MSG_COMMUNICATE,
FBT_CLI_ASK_DIRFILE, FBT_CLI_ASK_DIRFILE,

View File

@@ -4,6 +4,7 @@
#include <QDebug> #include <QDebug>
#include "InfoClient.h" #include "InfoClient.h"
#include "InfoMsg.h"
#include "InfoPack.hpp" #include "InfoPack.hpp"
Server::Server(QObject* parent) : QTcpServer(parent) Server::Server(QObject* parent) : QTcpServer(parent)
@@ -174,7 +175,36 @@ void Server::replyRequest(QSharedPointer<ClientInfo> client, QSharedPointer<Fram
replyFrame->tid = client->id; replyFrame->tid = client->id;
replyFrame->data = clientList; replyFrame->data = clientList;
auto ret = sendData(client->socket, replyFrame); auto ret = sendData(client->socket, replyFrame);
qDebug() << "Reply client list:" << client->id << ret; break;
}
case FBT_SER_MSG_HEARTBEAT: {
QSharedPointer<ClientInfo> cl;
{
QReadLocker locker(&rwLock_);
if (clients_.count(frame->fid)) {
cl = clients_.value(frame->fid);
}
}
if (cl) {
cl->connectTime = QDateTime::currentDateTime().toMSecsSinceEpoch() / 1000;
}
break;
}
case FBT_SER_MSG_JUDGE_OTHER_ALIVE: {
QSharedPointer<ClientInfo> cl;
{
QReadLocker locker(&rwLock_);
if (clients_.count(frame->tid)) {
cl = clients_.value(frame->tid);
}
}
if (!cl) {
auto rf = QSharedPointer<FrameBuffer>::create();
rf->type = FBT_SER_MSG_OFFLINE;
rf->fid = id_;
rf->tid = frame->fid;
sendData(client->socket, rf);
}
break; break;
} }
default: default:
@@ -199,7 +229,6 @@ bool Server::sendData(QTcpSocket* socket, QSharedPointer<FrameBuffer> frame)
void Server::monitorClients() void Server::monitorClients()
{ {
// qint64 now = QDateTime::currentSecsSinceEpoch();
qint64 now = QDateTime::currentDateTime().toMSecsSinceEpoch() / 1000; qint64 now = QDateTime::currentDateTime().toMSecsSinceEpoch() / 1000;
QWriteLocker locker(&rwLock_); QWriteLocker locker(&rwLock_);