flow:流量限定初步调试完成。

This commit is contained in:
2025-11-09 11:53:59 +08:00
parent 089d542a8d
commit 0ac77752c9
7 changed files with 125 additions and 7 deletions

View File

@@ -223,6 +223,10 @@ void ClientCore::UseFrame(QSharedPointer<FrameBuffer> frame)
emit sigMsgAnswer(frame);
break;
}
case FBT_SER_FLOW_LIMIT: {
emit sigFlowBack(frame);
break;
}
default:
qCritical() << QString("未知的帧类型: %1").arg(frame->type);
break;

View File

@@ -80,6 +80,7 @@ signals:
void sigYourId(QSharedPointer<FrameBuffer> frame);
void sigMsgAsk(QSharedPointer<FrameBuffer> frame);
void sigMsgAnswer(QSharedPointer<FrameBuffer> frame);
void sigFlowBack(QSharedPointer<FrameBuffer> frame);
signals:
void conSuccess();

View File

@@ -1,5 +1,6 @@
#include "FileTrans.h"
#include <QCoreApplication>
#include <QDir>
#include <QFileInfo>
@@ -362,6 +363,8 @@ void FileTrans::SendFile(const QSharedPointer<DoTransTask>& task)
{
auto* sendThread = new SendThread(clientCore_);
sendThread->setTask(task);
connect(clientCore_, &ClientCore::sigFlowBack, sendThread, &SendThread::fbtFlowBack);
QMutexLocker locker(&sthMut_);
upTasks_[task->task.localId] = sendThread;
sendThread->run();
@@ -371,13 +374,30 @@ SendThread::SendThread(ClientCore* clientCore) : cliCore_(clientCore)
{
}
// 发送速度控制
void SendThread::fbtFlowBack(QSharedPointer<FrameBuffer> frame)
{
auto msg = infoUnpack<InfoMsg>(frame->data);
delay_ = msg.mark * BLOCK_LEVEL_MULTIPLE;
}
// constexpr int SendThread::sendDelay()
// {
// // 这里限定,frlay的最大发送速度,目前是 200MB/s。
// constexpr int MAX_SEND_SPD = 1024 * 1024 * 200;
// constexpr double DELAY = (static_cast<double>(CHUNK_BUF_SIZE) / MAX_SEND_SPD) * 1000;
// int minDelay = qMax(static_cast<int>(DELAY), 1);
// return minDelay;
// }
void SendThread::run()
{
// task's file shoule be already opened.
isSuccess_ = true;
delay_ = 0;
bool invokeSuccess = false;
qInfo() << tr("开始发送文件:") << task_->file.fileName();
while (!task_->file.atEnd()) {
auto frame = QSharedPointer<FrameBuffer>::create();
frame->tid = task_->task.remoteId;
frame->type = FBT_CLI_FILE_BUFFER;
@@ -390,6 +410,9 @@ void SendThread::run()
break;
}
frame->data.resize(br);
if (delay_ > 0) {
QThread::msleep(delay_);
}
invokeSuccess = QMetaObject::invokeMethod(cliCore_, "SendFrame", Qt::BlockingQueuedConnection,
Q_RETURN_ARG(bool, isSuccess_), Q_ARG(QSharedPointer<FrameBuffer>, frame));
if (!invokeSuccess || !isSuccess_) {
@@ -397,7 +420,10 @@ void SendThread::run()
break;
}
task_->tranSize += frame->data.size();
// 关键点:这里不调用,无法中途收到别人发的数据。
QCoreApplication::processEvents();
}
qInfo() << tr("结束发送文件:") << task_->file.fileName();
InfoMsg info;
auto f = cliCore_->GetBuffer(info, FBT_CLI_TRANS_DONE, task_->task.remoteId);
ClientCore::syncInvoke(cliCore_, f);

View File

@@ -48,11 +48,16 @@ public:
public:
void run() override;
void setTask(const QSharedPointer<DoTransTask>& task);
void fbtFlowBack(QSharedPointer<FrameBuffer> frame);
// private:
// static constexpr int sendDelay();
private:
bool isSuccess_{false};
ClientCore* cliCore_;
quint32 curSendCount_{0};
qint32 delay_{};
QSharedPointer<DoTransTask> task_;
};

View File

@@ -7,7 +7,12 @@
#include <QString>
#include <functional>
constexpr quint32 CHUNK_BUF_SIZE = 1 * 1024 * 1024;
// 一帧包大小
constexpr quint32 CHUNK_BUF_SIZE = 1 * 1024 * 512;
// 流量背压倍率
constexpr quint32 FLOW_BACK_MULTIPLE = 50;
// 阻塞等级放大倍率
constexpr quint32 BLOCK_LEVEL_MULTIPLE = 5;
// It is specified here that the first 30 contents (inclusive) are
// used for communication with the server.
@@ -21,6 +26,7 @@ enum FrameBufferType : uint16_t {
FBT_SER_MSG_FORWARD_FAILED,
FBT_SER_MSG_JUDGE_OTHER_ALIVE,
FBT_SER_MSG_OFFLINE,
FBT_SER_FLOW_LIMIT,
FBT_CLI_BIN_FILEDATA = 31,
FBT_CLI_MSG_COMMUNICATE,
FBT_CLI_ASK_DIRFILE,
@@ -49,6 +55,19 @@ struct FrameBuffer {
bool sendRet;
};
// 拥堵等级,越高越堵
enum BlockLevel {
BL_LEVEL_NORMAL = 0,
BL_LEVEL_1 = 1,
BL_LEVEL_2 = 3,
BL_LEVEL_3 = 5,
BL_LEVEL_4 = 10,
BL_LEVEL_5 = 20,
BL_LEVEL_6 = 50,
BL_LEVEL_7 = 100,
BL_LEVEL_8 = 1000
};
class Protocol
{
public:

View File

@@ -76,6 +76,7 @@ void Server::onNewConnection()
{
QWriteLocker locker(&rwLock_);
clients_.insert(clientId, client);
flowBackCount_[clientId] = 0;
}
qInfo() << "Client connected:" << clientId;
@@ -116,7 +117,7 @@ void Server::processClientData(QSharedPointer<ClientInfo> client)
}
frame->fid = client->id;
if (frame->type <= 30) {
//frame->tid = "server";
// frame->tid = "server";
replyRequest(client, frame);
} else {
if (!forwardData(client, frame)) {
@@ -126,17 +127,41 @@ void Server::processClientData(QSharedPointer<ClientInfo> client)
}
}
bool Server::sendWithFlowCheck(QTcpSocket* fsoc, QTcpSocket* tsoc, QSharedPointer<FrameBuffer> frame)
{
auto flowLimit = [this](QTcpSocket* fsoc, BlockLevel aLevel) {
InfoMsg msg;
msg.mark = static_cast<qint32>(aLevel);
auto f = QSharedPointer<FrameBuffer>::create();
f->type = FBT_SER_FLOW_LIMIT;
f->data = infoPack<InfoMsg>(msg);
if (!sendData(fsoc, f)) {
qWarning() << "Failed to send flow limit message to" << fsoc->property("clientId").toString();
}
};
if (flowBackCount_[fsoc->property("clientId").toString()] > FLOW_BACK_MULTIPLE) {
auto level = getBlockLevel(tsoc);
flowLimit(fsoc, level);
//qDebug() << "Flow back count exceeded, block level:" << level;
}
flowBackCount_[fsoc->property("clientId").toString()]++;
return sendData(tsoc, frame);
}
bool Server::forwardData(QSharedPointer<ClientInfo> client, QSharedPointer<FrameBuffer> frame)
{
QSharedPointer<ClientInfo> targetClient;
QSharedPointer<ClientInfo> fromClient;
{
QReadLocker locker(&rwLock_);
targetClient = clients_.value(frame->tid);
fromClient = clients_.value(frame->fid);
}
if (targetClient) {
return sendData(targetClient->socket, frame);
if (targetClient && fromClient) {
return sendWithFlowCheck(fromClient->socket, targetClient->socket, frame);
} else {
auto errorFrame = QSharedPointer<FrameBuffer>::create();
errorFrame->type = FBT_SER_MSG_FORWARD_FAILED;
@@ -169,7 +194,7 @@ void Server::replyRequest(QSharedPointer<ClientInfo> client, QSharedPointer<Fram
}
}
if (cl) {
//qDebug() << "Client" << cl->id << "heartbeat received";
// qDebug() << "Client" << cl->id << "heartbeat received";
cl->connectTime = QDateTime::currentDateTime().toMSecsSinceEpoch() / 1000;
}
break;
@@ -207,10 +232,40 @@ bool Server::sendData(QTcpSocket* socket, QSharedPointer<FrameBuffer> frame)
if (data.isEmpty()) {
return false;
}
return socket->write(data) == data.size();
}
BlockLevel Server::getBlockLevel(QTcpSocket* socket)
{
auto b = socket->bytesToWrite();
constexpr auto one = CHUNK_BUF_SIZE;
if (b > one * 1000) {
return BL_LEVEL_8;
}
if (b > one * 200) {
return BL_LEVEL_7;
}
if (b > one * 100) {
return BL_LEVEL_6;
}
if (b > one * 50) {
return BL_LEVEL_5;
}
if (b > one * 30) {
return BL_LEVEL_4;
}
if (b > one * 10) {
return BL_LEVEL_3;
}
if (b > one * 5) {
return BL_LEVEL_2;
}
if (b > one * 1) {
return BL_LEVEL_1;
}
return BL_LEVEL_NORMAL;
}
void Server::onClientDisconnected()
{
QTcpSocket* socket = qobject_cast<QTcpSocket*>(sender());
@@ -225,6 +280,9 @@ void Server::onClientDisconnected()
if (clients_.count(clientId)) {
clients_.remove(clientId);
}
if (flowBackCount_.count(clientId)) {
flowBackCount_.remove(clientId);
}
}
qInfo() << "Client disconnected:" << __LINE__ << clientId;

View File

@@ -42,8 +42,13 @@ private:
void replyRequest(QSharedPointer<ClientInfo> client, QSharedPointer<FrameBuffer> frame);
bool sendData(QTcpSocket* socket, QSharedPointer<FrameBuffer> frame);
// 流量控制
bool sendWithFlowCheck(QTcpSocket* fsoc, QTcpSocket* tsoc, QSharedPointer<FrameBuffer> frame);
BlockLevel getBlockLevel(QTcpSocket* socket);
QString id_;
QMap<QString, QSharedPointer<ClientInfo>> clients_;
QMap<QString, int> flowBackCount_;
QReadWriteLock rwLock_;
QTimer* monitorTimer_;
};