clientcore: Regardless of whether it is connected, the ClientCore will be uniformly managed by a new thread.

This commit is contained in:
2025-06-20 14:33:51 +08:00
parent 0334b22ad6
commit 9c012b9613
7 changed files with 100 additions and 92 deletions

View File

@@ -4,6 +4,11 @@
ClientCore::ClientCore(QObject* parent) : QObject(parent) ClientCore::ClientCore(QObject* parent) : QObject(parent)
{ {
}
void ClientCore::Instance()
{
qDebug() << "Instance() thread:" << QThread::currentThread();
socket_ = new QTcpSocket(this); socket_ = new QTcpSocket(this);
connect(socket_, &QTcpSocket::readyRead, this, &ClientCore::onReadyRead); connect(socket_, &QTcpSocket::readyRead, this, &ClientCore::onReadyRead);
connect(socket_, &QTcpSocket::disconnected, this, &ClientCore::onDisconnected); connect(socket_, &QTcpSocket::disconnected, this, &ClientCore::onDisconnected);
@@ -13,6 +18,17 @@ ClientCore::~ClientCore()
{ {
} }
void ClientCore::DoConnect(const QString& ip, quint16 port)
{
qDebug() << "doConnect thread:" << QThread::currentThread();
emit connecting();
if (!Connect(ip, port)) {
emit conFailed();
return;
}
emit conSuccess();
}
bool ClientCore::Connect(const QString& ip, quint16 port) bool ClientCore::Connect(const QString& ip, quint16 port)
{ {
QMutexLocker locker(&conMutex_); QMutexLocker locker(&conMutex_);
@@ -26,11 +42,13 @@ bool ClientCore::Connect(const QString& ip, quint16 port)
return false; return false;
} }
qInfo() << QString(tr("%1:%2 connected success.")).arg(ip).arg(port); qInfo() << QString(tr("%1:%2 connected success.")).arg(ip).arg(port);
connected_ = true;
return true; return true;
} }
void ClientCore::Disconnect() void ClientCore::Disconnect()
{ {
connected_ = false;
} }
void ClientCore::onReadyRead() void ClientCore::onReadyRead()
@@ -48,6 +66,7 @@ void ClientCore::onReadyRead()
void ClientCore::onDisconnected() void ClientCore::onDisconnected()
{ {
connected_ = false;
qCritical() << QString("client %1 disconnected...").arg(remoteID_); qCritical() << QString("client %1 disconnected...").arg(remoteID_);
emit sigDisconnect(); emit sigDisconnect();
} }
@@ -201,7 +220,6 @@ QString ClientCore::GetOwnID()
SocketWorker::SocketWorker(ClientCore* core, QObject* parent) : QThread(parent), core_(core) SocketWorker::SocketWorker(ClientCore* core, QObject* parent) : QThread(parent), core_(core)
{ {
connect(core_, &ClientCore::sigDisconnect, this, [this]() { connect(core_, &ClientCore::sigDisconnect, this, [this]() {
emit disconnected();
thread()->quit(); thread()->quit();
}); });
} }
@@ -210,19 +228,9 @@ SocketWorker::~SocketWorker()
{ {
} }
void SocketWorker::SetConnectInfo(const QString& ip, qint16 port)
{
ip_ = ip;
port_ = port;
}
void SocketWorker::run() void SocketWorker::run()
{ {
emit connecting(); qDebug() << "SocketWorker thread:" << QThread::currentThread();
if (!core_->Connect(ip_, port_)) { core_->Instance();
emit conFailed();
return;
}
emit conSuccess();
exec(); exec();
} }

View File

@@ -27,7 +27,9 @@ public:
~ClientCore(); ~ClientCore();
public: public:
void Instance();
bool Connect(const QString& ip, quint16 port); bool Connect(const QString& ip, quint16 port);
void DoConnect(const QString& ip, quint16 port);
void Disconnect(); void Disconnect();
bool Send(QSharedPointer<FrameBuffer> frame); bool Send(QSharedPointer<FrameBuffer> frame);
bool Send(const char* data, qint64 len); bool Send(const char* data, qint64 len);
@@ -44,7 +46,12 @@ public:
f->type = type; f->type = type;
return f; return f;
} }
template <typename Callable> static bool asyncInvoke(QObject* context, Callable&& func) /*
When calling syncInvoke of ClientCore, the ClientCore should not be in the GUI's event loop thread.
In other words, if a ClientCore instance is created in the GUI thread, it should be moved to another thread;
otherwise, it will cause a deadlock and freeze the interface.
*/
template <typename Callable> static bool syncInvoke(QObject* context, Callable&& func)
{ {
auto promise = QSharedPointer<QPromise<bool>>::create(); auto promise = QSharedPointer<QPromise<bool>>::create();
QFuture<bool> future = promise->future(); QFuture<bool> future = promise->future();
@@ -81,6 +88,11 @@ signals:
void sigTransFailed(QSharedPointer<FrameBuffer> frame); void sigTransFailed(QSharedPointer<FrameBuffer> frame);
void sigFileInfo(QSharedPointer<FrameBuffer> frame); void sigFileInfo(QSharedPointer<FrameBuffer> frame);
signals:
void conSuccess();
void connecting();
void conFailed();
private: private:
void onReadyRead(); void onReadyRead();
void onDisconnected(); void onDisconnected();
@@ -99,9 +111,10 @@ public:
QString remoteID_; QString remoteID_;
QMutex sockMut_; QMutex sockMut_;
QTcpSocket* socket_; QTcpSocket* socket_{};
QByteArray recvBuffer_; QByteArray recvBuffer_;
bool connected_{ false };
LocalFile localFile_; LocalFile localFile_;
}; };
@@ -113,21 +126,10 @@ public:
SocketWorker(ClientCore* core, QObject* parent = nullptr); SocketWorker(ClientCore* core, QObject* parent = nullptr);
~SocketWorker(); ~SocketWorker();
public:
void SetConnectInfo(const QString& ip, qint16 port);
protected: protected:
void run() override; void run() override;
signals:
void conSuccess();
void connecting();
void conFailed();
void disconnected();
private: private:
QString ip_;
qint16 port_{};
ClientCore* core_{}; ClientCore* core_{};
}; };

View File

@@ -49,7 +49,7 @@ void FileTrans::ReqSendFile(const TransTask& task)
} }
auto frame = clientCore_->GetBuffer(info, FBT_CLI_REQ_SEND, task.remoteId); auto frame = clientCore_->GetBuffer(info, FBT_CLI_REQ_SEND, task.remoteId);
if (!ClientCore::asyncInvoke(clientCore_, [this, frame]() { return clientCore_->Send(frame); })) { if (!ClientCore::syncInvoke(clientCore_, [this, frame]() { return clientCore_->Send(frame); })) {
qCritical() << QString(tr("send req send failed: %1")).arg(info.msg); qCritical() << QString(tr("send req send failed: %1")).arg(info.msg);
sendTask_->state = TaskState::STATE_NONE; sendTask_->state = TaskState::STATE_NONE;
sendTask_->file.close(); sendTask_->file.close();
@@ -79,7 +79,7 @@ void FileTrans::ReqDownFile(const TransTask& task)
return; return;
} }
auto frame = clientCore_->GetBuffer(info, FBT_CLI_REQ_DOWN, task.remoteId); auto frame = clientCore_->GetBuffer(info, FBT_CLI_REQ_DOWN, task.remoteId);
if (!ClientCore::asyncInvoke(clientCore_, [this, frame]() { return clientCore_->Send(frame); })) { if (!ClientCore::syncInvoke(clientCore_, [this, frame]() { return clientCore_->Send(frame); })) {
qCritical() << QString(tr("send req send failed: %1")).arg(info.msg); qCritical() << QString(tr("send req send failed: %1")).arg(info.msg);
downTask_->state = TaskState::STATE_NONE; downTask_->state = TaskState::STATE_NONE;
downTask_->file.close(); downTask_->file.close();
@@ -154,7 +154,7 @@ void FileTrans::fbtReqSend(QSharedPointer<FrameBuffer> frame)
if (downTask_->state == TaskState::STATE_RUNNING) { if (downTask_->state == TaskState::STATE_RUNNING) {
info.msg = QString(tr("busy...")); info.msg = QString(tr("busy..."));
auto f = clientCore_->GetBuffer(info, FBT_CLI_CANOT_SEND, frame->fid); auto f = clientCore_->GetBuffer(info, FBT_CLI_CANOT_SEND, frame->fid);
ClientCore::asyncInvoke(clientCore_, [this, f]() { return clientCore_->Send(f); }); ClientCore::syncInvoke(clientCore_, [this, f]() { return clientCore_->Send(f); });
return; return;
} }
@@ -165,7 +165,7 @@ void FileTrans::fbtReqSend(QSharedPointer<FrameBuffer> frame)
info.msg = QString(tr("open file failed: %1")).arg(newerPath); info.msg = QString(tr("open file failed: %1")).arg(newerPath);
qCritical() << info.msg; qCritical() << info.msg;
auto f = clientCore_->GetBuffer(info, FBT_CLI_CANOT_SEND, frame->fid); auto f = clientCore_->GetBuffer(info, FBT_CLI_CANOT_SEND, frame->fid);
if (!ClientCore::asyncInvoke(clientCore_, [this, f]() { return clientCore_->Send(f); })) { if (!ClientCore::syncInvoke(clientCore_, [this, f]() { return clientCore_->Send(f); })) {
qCritical() << QString(tr("open recv file:%2 failed, and reply %2 failed.")).arg(info.msg, f->fid); qCritical() << QString(tr("open recv file:%2 failed, and reply %2 failed.")).arg(info.msg, f->fid);
downTask_->file.close(); downTask_->file.close();
return; return;
@@ -180,7 +180,7 @@ void FileTrans::fbtReqSend(QSharedPointer<FrameBuffer> frame)
info.msg = QString(tr("open recv file success: %1")).arg(newerPath); info.msg = QString(tr("open recv file success: %1")).arg(newerPath);
qInfo() << info.msg; qInfo() << info.msg;
auto f = clientCore_->GetBuffer(info, FBT_CLI_CAN_SEND, frame->fid); auto f = clientCore_->GetBuffer(info, FBT_CLI_CAN_SEND, frame->fid);
if (!ClientCore::asyncInvoke(clientCore_, [this, f]() { return clientCore_->Send(f); })) { if (!ClientCore::syncInvoke(clientCore_, [this, f]() { return clientCore_->Send(f); })) {
qCritical() << QString(tr("open recv file:%2 success, but reply %2 failed.")).arg(info.msg, frame->fid); qCritical() << QString(tr("open recv file:%2 success, but reply %2 failed.")).arg(info.msg, frame->fid);
downTask_->file.close(); downTask_->file.close();
return; return;
@@ -217,7 +217,7 @@ void FileTrans::fbtReqDown(QSharedPointer<FrameBuffer> frame)
// reply fileinfo // reply fileinfo
auto f = clientCore_->GetBuffer(info, FBT_CLI_FILE_INFO, frame->fid); auto f = clientCore_->GetBuffer(info, FBT_CLI_FILE_INFO, frame->fid);
if (!ClientCore::asyncInvoke(clientCore_, [this, f]() { return clientCore_->Send(f); })) { if (!ClientCore::syncInvoke(clientCore_, [this, f]() { return clientCore_->Send(f); })) {
qCritical() << QString(tr("send file %1 info failed.")).arg(info.fromPath); qCritical() << QString(tr("send file %1 info failed.")).arg(info.fromPath);
doTask->file.close(); doTask->file.close();
return; return;
@@ -237,7 +237,7 @@ void FileTrans::fbtTransDone(QSharedPointer<FrameBuffer> frame)
info.msg = QString(tr("recv file:%1 success.")).arg(downTask_->file.fileName()); info.msg = QString(tr("recv file:%1 success.")).arg(downTask_->file.fileName());
qInfo() << info.msg; qInfo() << info.msg;
auto f = clientCore_->GetBuffer(info, FBT_CLI_CAN_DOWN, frame->fid); auto f = clientCore_->GetBuffer(info, FBT_CLI_CAN_DOWN, frame->fid);
ClientCore::asyncInvoke(clientCore_, [this, f]() { return clientCore_->Send(f); }); ClientCore::syncInvoke(clientCore_, [this, f]() { return clientCore_->Send(f); });
return; return;
} }
qCritical() << QString(tr("recv file:%1 done sigal, but file not opened.")).arg(info.msg); qCritical() << QString(tr("recv file:%1 done sigal, but file not opened.")).arg(info.msg);
@@ -272,7 +272,7 @@ void FileTrans::fbtFileBuffer(QSharedPointer<FrameBuffer> frame)
InfoMsg info; InfoMsg info;
info.msg = downTask_->file.errorString(); info.msg = downTask_->file.errorString();
auto f = clientCore_->GetBuffer(info, FBT_CLI_TRANS_FAILED, frame->fid); auto f = clientCore_->GetBuffer(info, FBT_CLI_TRANS_FAILED, frame->fid);
ClientCore::asyncInvoke(clientCore_, [this, f]() { return clientCore_->Send(f); }); ClientCore::syncInvoke(clientCore_, [this, f]() { return clientCore_->Send(f); });
downTask_->file.close(); downTask_->file.close();
} }
downTask_->tranSize += ws; downTask_->tranSize += ws;
@@ -372,7 +372,7 @@ void SendThread::run()
InfoMsg info; InfoMsg info;
auto f = cliCore_->GetBuffer(info, FBT_CLI_TRANS_DONE, task_->task.remoteId); auto f = cliCore_->GetBuffer(info, FBT_CLI_TRANS_DONE, task_->task.remoteId);
ClientCore::asyncInvoke(cliCore_, [this, f]() { return cliCore_->Send(f); }); ClientCore::syncInvoke(cliCore_, [this, f]() { return cliCore_->Send(f); });
task_->file.close(); task_->file.close();
task_->state = TaskState::STATE_FINISH; task_->state = TaskState::STATE_FINISH;
} }

View File

@@ -19,7 +19,7 @@ bool RemoteFile::GetHome()
{ {
InfoMsg info; InfoMsg info;
auto frame = cliCore_->GetBuffer(info, FBT_CLI_ASK_HOME, cliCore_->GetRemoteID()); auto frame = cliCore_->GetBuffer(info, FBT_CLI_ASK_HOME, cliCore_->GetRemoteID());
return ClientCore::asyncInvoke(cliCore_, [this, frame]() { return cliCore_->Send(frame); }); return ClientCore::syncInvoke(cliCore_, [this, frame]() { return cliCore_->Send(frame); });
} }
bool RemoteFile::GetDirFile(const QString& dir) bool RemoteFile::GetDirFile(const QString& dir)
@@ -27,5 +27,5 @@ bool RemoteFile::GetDirFile(const QString& dir)
InfoMsg info; InfoMsg info;
info.msg = dir; info.msg = dir;
auto frame = cliCore_->GetBuffer(info, FBT_CLI_ASK_DIRFILE, cliCore_->GetRemoteID()); auto frame = cliCore_->GetBuffer(info, FBT_CLI_ASK_DIRFILE, cliCore_->GetRemoteID());
return ClientCore::asyncInvoke(cliCore_, [this, frame]() { return cliCore_->Send(frame); }); return ClientCore::syncInvoke(cliCore_, [this, frame]() { return cliCore_->Send(frame); });
} }

View File

@@ -18,10 +18,38 @@ Connecter::~Connecter()
delete ui; delete ui;
} }
void Connecter::SetClientCore(ClientCore* clientCore) void Connecter::RunWorker(ClientCore* clientCore)
{ {
clientCore_ = clientCore; clientCore_ = clientCore;
connect(clientCore_, &ClientCore::sigClients, this, &Connecter::HandleClients); connect(clientCore_, &ClientCore::sigClients, this, &Connecter::HandleClients);
sockWorker_ = new SocketWorker(clientCore_, nullptr);
clientCore_->moveToThread(sockWorker_);
connect(clientCore_, &ClientCore::conSuccess, this, [this]() {
setState(ConnectState::CS_CONNECTED);
qInfo() << QString(tr("Connected."));
});
connect(clientCore_, &ClientCore::conFailed, this, [this]() {
setState(ConnectState::CS_DISCONNECT);
qInfo() << QString(tr("Connect failed."));
});
connect(clientCore_, &ClientCore::connecting, this, [this]() {
setState(ConnectState::CS_CONNECTING);
qInfo() << QString(tr("Connecting..."));
});
connect(clientCore_, &ClientCore::sigDisconnect, this, [this]() {
setState(ConnectState::CS_DISCONNECT);
qInfo() << QString(tr("Disconnected."));
});
connect(this, &Connecter::sigDoConnect, clientCore_, &ClientCore::DoConnect);
connect(sockWorker_, &QThread::finished, sockWorker_, &QObject::deleteLater);
sockWorker_->start();
} }
void Connecter::SetRemoteCall(const std::function<void(const QString& id)>& call) void Connecter::SetRemoteCall(const std::function<void(const QString& id)>& call)
@@ -46,33 +74,7 @@ void Connecter::Connect()
FTCommon::msg(this, tr("IP or Port is empty.")); FTCommon::msg(this, tr("IP or Port is empty."));
return; return;
} }
emit sigDoConnect(ip, port.toInt());
sockWorker_ = new SocketWorker(clientCore_, nullptr);
clientCore_->moveToThread(sockWorker_);
connect(sockWorker_, &SocketWorker::conSuccess, this, [this]() {
setState(ConnectState::CS_CONNECTED);
qInfo() << QString(tr("Connected."));
});
connect(sockWorker_, &SocketWorker::conFailed, this, [this]() {
setState(ConnectState::CS_DISCONNECT);
qInfo() << QString(tr("Connect failed."));
});
connect(sockWorker_, &SocketWorker::connecting, this, [this]() {
setState(ConnectState::CS_CONNECTING);
qInfo() << QString(tr("Connecting..."));
});
connect(sockWorker_, &SocketWorker::disconnected, this, [this]() {
setState(ConnectState::CS_DISCONNECT);
qInfo() << QString(tr("Disconnected."));
});
connect(sockWorker_, &QThread::finished, sockWorker_, &QObject::deleteLater);
sockWorker_->SetConnectInfo(ip, port.toInt());
sockWorker_->start();
} }
void Connecter::setState(ConnectState cs) void Connecter::setState(ConnectState cs)
@@ -107,7 +109,7 @@ void Connecter::RefreshClient()
auto frame = QSharedPointer<FrameBuffer>::create(); auto frame = QSharedPointer<FrameBuffer>::create();
frame->data = infoPack(info); frame->data = infoPack(info);
frame->type = FBT_SER_MSG_ASKCLIENTS; frame->type = FBT_SER_MSG_ASKCLIENTS;
auto sendRet = ClientCore::asyncInvoke(clientCore_, [this, frame]() { return clientCore_->Send(frame); }); auto sendRet = ClientCore::syncInvoke(clientCore_, [this, frame]() { return clientCore_->Send(frame); });
if (!sendRet) { if (!sendRet) {
qCritical() << QString(tr("send ask client list failed.")); qCritical() << QString(tr("send ask client list failed."));
return; return;

View File

@@ -26,12 +26,13 @@ public:
~Connecter(); ~Connecter();
public: public:
void SetClientCore(ClientCore* clientCore); void RunWorker(ClientCore* clientCore);
void SetRemoteCall(const std::function<void(const QString& id)>& call); void SetRemoteCall(const std::function<void(const QString& id)>& call);
void HandleClients(const InfoClientVec& clients); void HandleClients(const InfoClientVec& clients);
signals: signals:
void sendConnect(ConnectState cs); void sendConnect(ConnectState cs);
void sigDoConnect(const QString& ip, quint16 port);
private: private:
void InitControl(); void InitControl();

View File

@@ -40,7 +40,7 @@ void frelayGUI::InitControl()
transform_->SetClientCore(clientCore_); transform_->SetClientCore(clientCore_);
connecter_ = new Connecter(this); connecter_ = new Connecter(this);
connecter_->SetClientCore(clientCore_); connecter_->RunWorker(clientCore_);
connecter_->SetRemoteCall([this](const QString& id) { clientCore_->SetRemoteID(id); }); connecter_->SetRemoteCall([this](const QString& id) { clientCore_->SetRemoteID(id); });
localFile_ = new FileManager(this); localFile_ = new FileManager(this);
@@ -88,12 +88,9 @@ void frelayGUI::ControlLayout()
void frelayGUI::ControlMsgHander(QtMsgType type, const QMessageLogContext& context, const QString& msg) void frelayGUI::ControlMsgHander(QtMsgType type, const QMessageLogContext& context, const QString& msg)
{ {
Q_UNUSED(context); QMetaObject::invokeMethod(
qApp,
//if (!qApp || !qobject_cast<frelayGUI*>(qApp->activeWindow())) { [type, msg]() {
// return;
//}
switch (type) { switch (type) {
case QtDebugMsg: case QtDebugMsg:
logPrint->Debug(msg); logPrint->Debug(msg);
@@ -104,14 +101,12 @@ void frelayGUI::ControlMsgHander(QtMsgType type, const QMessageLogContext& conte
case QtWarningMsg: case QtWarningMsg:
logPrint->Warn(msg); logPrint->Warn(msg);
break; break;
case QtCriticalMsg: default:
case QtFatalMsg:
logPrint->Error(msg); logPrint->Error(msg);
break; break;
default:
logPrint->Error("Unknown QtMsgType type.");
break;
} }
},
Qt::QueuedConnection);
} }
void frelayGUI::HandleTask(const QVector<TransTask>& tasks) void frelayGUI::HandleTask(const QVector<TransTask>& tasks)