From 0ac77752c98937ed7a885f38008b5fc6074d42f5 Mon Sep 17 00:00:00 2001 From: taynpg Date: Sun, 9 Nov 2025 11:53:59 +0800 Subject: [PATCH] =?UTF-8?q?flow=EF=BC=9A=E6=B5=81=E9=87=8F=E9=99=90?= =?UTF-8?q?=E5=AE=9A=E5=88=9D=E6=AD=A5=E8=B0=83=E8=AF=95=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ClientCore/ClientCore.cpp | 4 +++ ClientCore/ClientCore.h | 1 + ClientCore/FileTrans.cpp | 28 +++++++++++++++- ClientCore/FileTrans.h | 5 +++ Protocol/Protocol.h | 21 +++++++++++- Server/Server.cpp | 68 ++++++++++++++++++++++++++++++++++++--- Server/Server.h | 5 +++ 7 files changed, 125 insertions(+), 7 deletions(-) diff --git a/ClientCore/ClientCore.cpp b/ClientCore/ClientCore.cpp index 3cffebd..d77f262 100644 --- a/ClientCore/ClientCore.cpp +++ b/ClientCore/ClientCore.cpp @@ -223,6 +223,10 @@ void ClientCore::UseFrame(QSharedPointer frame) emit sigMsgAnswer(frame); break; } + case FBT_SER_FLOW_LIMIT: { + emit sigFlowBack(frame); + break; + } default: qCritical() << QString("未知的帧类型: %1").arg(frame->type); break; diff --git a/ClientCore/ClientCore.h b/ClientCore/ClientCore.h index 33aa6a6..581d1cc 100644 --- a/ClientCore/ClientCore.h +++ b/ClientCore/ClientCore.h @@ -80,6 +80,7 @@ signals: void sigYourId(QSharedPointer frame); void sigMsgAsk(QSharedPointer frame); void sigMsgAnswer(QSharedPointer frame); + void sigFlowBack(QSharedPointer frame); signals: void conSuccess(); diff --git a/ClientCore/FileTrans.cpp b/ClientCore/FileTrans.cpp index 563cfa4..74048f1 100644 --- a/ClientCore/FileTrans.cpp +++ b/ClientCore/FileTrans.cpp @@ -1,5 +1,6 @@ #include "FileTrans.h" +#include #include #include @@ -362,6 +363,8 @@ void FileTrans::SendFile(const QSharedPointer& task) { auto* sendThread = new SendThread(clientCore_); sendThread->setTask(task); + connect(clientCore_, &ClientCore::sigFlowBack, sendThread, &SendThread::fbtFlowBack); + QMutexLocker locker(&sthMut_); upTasks_[task->task.localId] = sendThread; sendThread->run(); @@ -371,13 +374,30 @@ SendThread::SendThread(ClientCore* clientCore) : cliCore_(clientCore) { } +// 发送速度控制 +void SendThread::fbtFlowBack(QSharedPointer frame) +{ + auto msg = infoUnpack(frame->data); + delay_ = msg.mark * BLOCK_LEVEL_MULTIPLE; +} + +// constexpr int SendThread::sendDelay() +// { +// // 这里限定,frlay的最大发送速度,目前是 200MB/s。 +// constexpr int MAX_SEND_SPD = 1024 * 1024 * 200; +// constexpr double DELAY = (static_cast(CHUNK_BUF_SIZE) / MAX_SEND_SPD) * 1000; +// int minDelay = qMax(static_cast(DELAY), 1); +// return minDelay; +// } + void SendThread::run() { // task's file shoule be already opened. isSuccess_ = true; + delay_ = 0; bool invokeSuccess = false; + qInfo() << tr("开始发送文件:") << task_->file.fileName(); while (!task_->file.atEnd()) { - auto frame = QSharedPointer::create(); frame->tid = task_->task.remoteId; frame->type = FBT_CLI_FILE_BUFFER; @@ -390,6 +410,9 @@ void SendThread::run() break; } frame->data.resize(br); + if (delay_ > 0) { + QThread::msleep(delay_); + } invokeSuccess = QMetaObject::invokeMethod(cliCore_, "SendFrame", Qt::BlockingQueuedConnection, Q_RETURN_ARG(bool, isSuccess_), Q_ARG(QSharedPointer, frame)); if (!invokeSuccess || !isSuccess_) { @@ -397,7 +420,10 @@ void SendThread::run() break; } task_->tranSize += frame->data.size(); + // 关键点:这里不调用,无法中途收到别人发的数据。 + QCoreApplication::processEvents(); } + qInfo() << tr("结束发送文件:") << task_->file.fileName(); InfoMsg info; auto f = cliCore_->GetBuffer(info, FBT_CLI_TRANS_DONE, task_->task.remoteId); ClientCore::syncInvoke(cliCore_, f); diff --git a/ClientCore/FileTrans.h b/ClientCore/FileTrans.h index 2ac2cfc..581fd9f 100644 --- a/ClientCore/FileTrans.h +++ b/ClientCore/FileTrans.h @@ -48,11 +48,16 @@ public: public: void run() override; void setTask(const QSharedPointer& task); + void fbtFlowBack(QSharedPointer frame); + +// private: +// static constexpr int sendDelay(); private: bool isSuccess_{false}; ClientCore* cliCore_; quint32 curSendCount_{0}; + qint32 delay_{}; QSharedPointer task_; }; diff --git a/Protocol/Protocol.h b/Protocol/Protocol.h index 7ae650e..e0c1f53 100644 --- a/Protocol/Protocol.h +++ b/Protocol/Protocol.h @@ -7,7 +7,12 @@ #include #include -constexpr quint32 CHUNK_BUF_SIZE = 1 * 1024 * 1024; +// 一帧包大小 +constexpr quint32 CHUNK_BUF_SIZE = 1 * 1024 * 512; +// 流量背压倍率 +constexpr quint32 FLOW_BACK_MULTIPLE = 50; +// 阻塞等级放大倍率 +constexpr quint32 BLOCK_LEVEL_MULTIPLE = 5; // It is specified here that the first 30 contents (inclusive) are // used for communication with the server. @@ -21,6 +26,7 @@ enum FrameBufferType : uint16_t { FBT_SER_MSG_FORWARD_FAILED, FBT_SER_MSG_JUDGE_OTHER_ALIVE, FBT_SER_MSG_OFFLINE, + FBT_SER_FLOW_LIMIT, FBT_CLI_BIN_FILEDATA = 31, FBT_CLI_MSG_COMMUNICATE, FBT_CLI_ASK_DIRFILE, @@ -49,6 +55,19 @@ struct FrameBuffer { bool sendRet; }; +// 拥堵等级,越高越堵 +enum BlockLevel { + BL_LEVEL_NORMAL = 0, + BL_LEVEL_1 = 1, + BL_LEVEL_2 = 3, + BL_LEVEL_3 = 5, + BL_LEVEL_4 = 10, + BL_LEVEL_5 = 20, + BL_LEVEL_6 = 50, + BL_LEVEL_7 = 100, + BL_LEVEL_8 = 1000 +}; + class Protocol { public: diff --git a/Server/Server.cpp b/Server/Server.cpp index 4f89338..3df66ed 100644 --- a/Server/Server.cpp +++ b/Server/Server.cpp @@ -76,6 +76,7 @@ void Server::onNewConnection() { QWriteLocker locker(&rwLock_); clients_.insert(clientId, client); + flowBackCount_[clientId] = 0; } qInfo() << "Client connected:" << clientId; @@ -116,7 +117,7 @@ void Server::processClientData(QSharedPointer client) } frame->fid = client->id; if (frame->type <= 30) { - //frame->tid = "server"; + // frame->tid = "server"; replyRequest(client, frame); } else { if (!forwardData(client, frame)) { @@ -126,17 +127,41 @@ void Server::processClientData(QSharedPointer client) } } +bool Server::sendWithFlowCheck(QTcpSocket* fsoc, QTcpSocket* tsoc, QSharedPointer frame) +{ + auto flowLimit = [this](QTcpSocket* fsoc, BlockLevel aLevel) { + InfoMsg msg; + msg.mark = static_cast(aLevel); + auto f = QSharedPointer::create(); + f->type = FBT_SER_FLOW_LIMIT; + f->data = infoPack(msg); + if (!sendData(fsoc, f)) { + qWarning() << "Failed to send flow limit message to" << fsoc->property("clientId").toString(); + } + }; + + if (flowBackCount_[fsoc->property("clientId").toString()] > FLOW_BACK_MULTIPLE) { + auto level = getBlockLevel(tsoc); + flowLimit(fsoc, level); + //qDebug() << "Flow back count exceeded, block level:" << level; + } + flowBackCount_[fsoc->property("clientId").toString()]++; + return sendData(tsoc, frame); +} + bool Server::forwardData(QSharedPointer client, QSharedPointer frame) { QSharedPointer targetClient; + QSharedPointer fromClient; { QReadLocker locker(&rwLock_); targetClient = clients_.value(frame->tid); + fromClient = clients_.value(frame->fid); } - if (targetClient) { - return sendData(targetClient->socket, frame); + if (targetClient && fromClient) { + return sendWithFlowCheck(fromClient->socket, targetClient->socket, frame); } else { auto errorFrame = QSharedPointer::create(); errorFrame->type = FBT_SER_MSG_FORWARD_FAILED; @@ -169,7 +194,7 @@ void Server::replyRequest(QSharedPointer client, QSharedPointerid << "heartbeat received"; + // qDebug() << "Client" << cl->id << "heartbeat received"; cl->connectTime = QDateTime::currentDateTime().toMSecsSinceEpoch() / 1000; } break; @@ -207,10 +232,40 @@ bool Server::sendData(QTcpSocket* socket, QSharedPointer frame) if (data.isEmpty()) { return false; } - return socket->write(data) == data.size(); } +BlockLevel Server::getBlockLevel(QTcpSocket* socket) +{ + auto b = socket->bytesToWrite(); + constexpr auto one = CHUNK_BUF_SIZE; + if (b > one * 1000) { + return BL_LEVEL_8; + } + if (b > one * 200) { + return BL_LEVEL_7; + } + if (b > one * 100) { + return BL_LEVEL_6; + } + if (b > one * 50) { + return BL_LEVEL_5; + } + if (b > one * 30) { + return BL_LEVEL_4; + } + if (b > one * 10) { + return BL_LEVEL_3; + } + if (b > one * 5) { + return BL_LEVEL_2; + } + if (b > one * 1) { + return BL_LEVEL_1; + } + return BL_LEVEL_NORMAL; +} + void Server::onClientDisconnected() { QTcpSocket* socket = qobject_cast(sender()); @@ -225,6 +280,9 @@ void Server::onClientDisconnected() if (clients_.count(clientId)) { clients_.remove(clientId); } + if (flowBackCount_.count(clientId)) { + flowBackCount_.remove(clientId); + } } qInfo() << "Client disconnected:" << __LINE__ << clientId; diff --git a/Server/Server.h b/Server/Server.h index c95e797..8a1fb88 100644 --- a/Server/Server.h +++ b/Server/Server.h @@ -42,8 +42,13 @@ private: void replyRequest(QSharedPointer client, QSharedPointer frame); bool sendData(QTcpSocket* socket, QSharedPointer frame); + // 流量控制 + bool sendWithFlowCheck(QTcpSocket* fsoc, QTcpSocket* tsoc, QSharedPointer frame); + BlockLevel getBlockLevel(QTcpSocket* socket); + QString id_; QMap> clients_; + QMap flowBackCount_; QReadWriteLock rwLock_; QTimer* monitorTimer_; };