socket: Unified Soket thread management.
This commit is contained in:
@@ -49,6 +49,7 @@ void ClientCore::onReadyRead()
|
||||
void ClientCore::onDisconnected()
|
||||
{
|
||||
qCritical() << QString("client %1 disconnected...").arg(remoteID_);
|
||||
emit sigDisconnect();
|
||||
}
|
||||
|
||||
void ClientCore::UseFrame(QSharedPointer<FrameBuffer> frame)
|
||||
@@ -172,3 +173,32 @@ QString ClientCore::GetOwnID()
|
||||
{
|
||||
return ownID_;
|
||||
}
|
||||
|
||||
SocketWorker::SocketWorker(ClientCore* core, QObject* parent) : QThread(parent), core_(core)
|
||||
{
|
||||
connect(core_, &ClientCore::sigDisconnect, this, [this]() {
|
||||
emit disconnected();
|
||||
thread()->quit();
|
||||
});
|
||||
}
|
||||
|
||||
SocketWorker::~SocketWorker()
|
||||
{
|
||||
}
|
||||
|
||||
void SocketWorker::SetConnectInfo(const QString& ip, qint16 port)
|
||||
{
|
||||
ip_ = ip;
|
||||
port_ = port;
|
||||
}
|
||||
|
||||
void SocketWorker::run()
|
||||
{
|
||||
emit connecting();
|
||||
if (!core_->Connect(ip_, port_)) {
|
||||
emit conFailed();
|
||||
return;
|
||||
}
|
||||
emit conSuccess();
|
||||
exec();
|
||||
}
|
||||
|
||||
@@ -8,9 +8,12 @@
|
||||
#include <LocalFile.h>
|
||||
#include <Protocol.h>
|
||||
#include <QDataStream>
|
||||
#include <QFuture>
|
||||
#include <QHostAddress>
|
||||
#include <QMutex>
|
||||
#include <QMutexLocker>
|
||||
#include <QPromise>
|
||||
#include <QQueue>
|
||||
#include <QTcpSocket>
|
||||
#include <QThread>
|
||||
#include <array>
|
||||
@@ -29,13 +32,41 @@ public:
|
||||
bool Send(QSharedPointer<FrameBuffer> frame);
|
||||
bool Send(const char* data, qint64 len);
|
||||
template <typename T> bool Send(const T& info, FrameBufferType type, const QString& tid)
|
||||
{
|
||||
auto f = GetBuffer<T>(info, type, tid);
|
||||
return Send(f);
|
||||
}
|
||||
template <typename T> QSharedPointer<FrameBuffer> GetBuffer(const T& info, FrameBufferType type, const QString& tid)
|
||||
{
|
||||
auto f = QSharedPointer<FrameBuffer>::create();
|
||||
f->tid = tid;
|
||||
f->data = infoPack<T>(info);
|
||||
f->type = type;
|
||||
return Send(f);
|
||||
return f;
|
||||
}
|
||||
template <typename Callable> static bool asyncInvoke(QObject* context, Callable&& func)
|
||||
{
|
||||
auto promise = QSharedPointer<QPromise<bool>>::create();
|
||||
QFuture<bool> future = promise->future();
|
||||
|
||||
QMetaObject::invokeMethod(
|
||||
context,
|
||||
[func = std::forward<Callable>(func), promise]() mutable {
|
||||
try {
|
||||
promise->addResult(func());
|
||||
} catch (...) {
|
||||
promise->addResult(false);
|
||||
}
|
||||
promise->finish();
|
||||
},
|
||||
Qt::QueuedConnection);
|
||||
|
||||
future.waitForFinished();
|
||||
return future.result();
|
||||
}
|
||||
|
||||
signals:
|
||||
void sigDisconnect();
|
||||
|
||||
private:
|
||||
void onReadyRead();
|
||||
@@ -71,4 +102,30 @@ public:
|
||||
std::array<std::function<void(QSharedPointer<FrameBuffer>)>, 256> frameCall_;
|
||||
};
|
||||
|
||||
class SocketWorker : public QThread
|
||||
{
|
||||
Q_OBJECT
|
||||
|
||||
public:
|
||||
SocketWorker(ClientCore* core, QObject* parent = nullptr);
|
||||
~SocketWorker();
|
||||
|
||||
public:
|
||||
void SetConnectInfo(const QString& ip, qint16 port);
|
||||
|
||||
protected:
|
||||
void run() override;
|
||||
|
||||
signals:
|
||||
void conSuccess();
|
||||
void connecting();
|
||||
void conFailed();
|
||||
void disconnected();
|
||||
|
||||
private:
|
||||
QString ip_;
|
||||
qint16 port_;
|
||||
ClientCore* core_;
|
||||
};
|
||||
|
||||
#endif // CLIENTCORE_H
|
||||
@@ -63,11 +63,11 @@ void FileTrans::ReqDownFile(const TransTask& task)
|
||||
info.fromPath = task.remotePath;
|
||||
|
||||
downTask_->file.setFileName(Util::Get2FilePath(task.remotePath, task.localPath));
|
||||
if (!downTask_->file.open(QIODevice::WriteOnly)) {
|
||||
if (!downTask_->file.open(QIODevice::WriteOnly)) {
|
||||
qCritical() << QString(tr("open file [%1] failed.")).arg(downTask_->file.fileName());
|
||||
downTask_->state = TaskState::STATE_NONE;
|
||||
return;
|
||||
}
|
||||
downTask_->state = TaskState::STATE_NONE;
|
||||
return;
|
||||
}
|
||||
if (!clientCore_->Send<InfoMsg>(info, FBT_CLI_REQ_DOWN, task.remoteId)) {
|
||||
qCritical() << QString(tr("send req send failed: %1")).arg(info.msg);
|
||||
sendTask_->state = TaskState::STATE_NONE;
|
||||
@@ -114,7 +114,7 @@ void FileTrans::RegisterFrameCall()
|
||||
void FileTrans::fbtReqSend(QSharedPointer<FrameBuffer> frame)
|
||||
{
|
||||
InfoMsg info = infoUnpack<InfoMsg>(frame->data);
|
||||
qInfo() << QString(tr("%1 req send: %2 to %3")).arg(frame->fid).arg(info.fromPath).arg(info.toPath);
|
||||
qInfo() << QString(tr("%1 req send: %2 to %3")).arg(frame->fid).arg(info.fromPath, info.toPath);
|
||||
|
||||
// judge is same client's same file.
|
||||
|
||||
@@ -132,7 +132,7 @@ void FileTrans::fbtReqSend(QSharedPointer<FrameBuffer> frame)
|
||||
info.msg = QString(tr("open file failed: %1")).arg(newerPath);
|
||||
qCritical() << info.msg;
|
||||
if (!clientCore_->Send<InfoMsg>(info, FBT_CLI_CANOT_SEND, frame->fid)) {
|
||||
qCritical() << QString(tr("open recv file:%2 failed, and reply %2 failed.")).arg(info.msg).arg(frame->fid);
|
||||
qCritical() << QString(tr("open recv file:%2 failed, and reply %2 failed.")).arg(info.msg, frame->fid);
|
||||
downTask_->file.close();
|
||||
return;
|
||||
}
|
||||
@@ -144,8 +144,10 @@ void FileTrans::fbtReqSend(QSharedPointer<FrameBuffer> frame)
|
||||
downTask_->permission = info.permissions;
|
||||
|
||||
info.msg = QString(tr("open recv file success: %1")).arg(newerPath);
|
||||
if (!clientCore_->Send<InfoMsg>(info, FBT_CLI_CAN_SEND, frame->fid)) {
|
||||
qCritical() << QString(tr("open recv file:%2 success, but reply %2 failed.")).arg(info.msg).arg(frame->fid);
|
||||
auto f = clientCore_->GetBuffer(info, FBT_CLI_CAN_SEND, frame->fid);
|
||||
auto sendRet = ClientCore::asyncInvoke(clientCore_, [this, f]() { return clientCore_->Send(f); });
|
||||
if (!sendRet) {
|
||||
qCritical() << QString(tr("open recv file:%2 success, but reply %2 failed.")).arg(info.msg, frame->fid);
|
||||
downTask_->file.close();
|
||||
return;
|
||||
}
|
||||
@@ -200,7 +202,7 @@ void FileTrans::fbtCanDown(QSharedPointer<FrameBuffer> frame)
|
||||
void FileTrans::fbtCanotDown(QSharedPointer<FrameBuffer> frame)
|
||||
{
|
||||
InfoMsg info = infoUnpack<InfoMsg>(frame->data);
|
||||
qCritical() << QString(tr("request send file:%1 failed. reason:%2")).arg(info.fromPath).arg(info.msg);
|
||||
qCritical() << QString(tr("request send file:%1 failed. reason:%2")).arg(info.fromPath, info.msg);
|
||||
}
|
||||
|
||||
void FileTrans::fbtFileBuffer(QSharedPointer<FrameBuffer> frame)
|
||||
@@ -223,7 +225,7 @@ void FileTrans::fbtFileBuffer(QSharedPointer<FrameBuffer> frame)
|
||||
void FileTrans::fbtCanotSend(QSharedPointer<FrameBuffer> frame)
|
||||
{
|
||||
InfoMsg info = infoUnpack<InfoMsg>(frame->data);
|
||||
qCritical() << QString(tr("request file:%1 failed. reason:%2")).arg(info.fromPath).arg(info.msg);
|
||||
qCritical() << QString(tr("request file:%1 failed. reason:%2")).arg(info.fromPath, info.msg);
|
||||
if (sendTask_->file.isOpen()) {
|
||||
sendTask_->file.close();
|
||||
}
|
||||
@@ -232,7 +234,7 @@ void FileTrans::fbtCanotSend(QSharedPointer<FrameBuffer> frame)
|
||||
void FileTrans::fbtCanSend(QSharedPointer<FrameBuffer> frame)
|
||||
{
|
||||
InfoMsg info = infoUnpack<InfoMsg>(frame->data);
|
||||
qInfo() << QString(tr("start trans file:%1 to %2")).arg(info.fromPath).arg(frame->fid);
|
||||
qInfo() << QString(tr("start trans file:%1 to %2")).arg(info.fromPath, frame->fid);
|
||||
SendFile(sendTask_);
|
||||
}
|
||||
|
||||
@@ -255,6 +257,21 @@ void FileTrans::SendFile(const QSharedPointer<DoTransTask>& task)
|
||||
sendThread->run();
|
||||
}
|
||||
|
||||
QFuture<bool> FileTrans::sendFrameAsync(const QSharedPointer<FrameBuffer>& frame)
|
||||
{
|
||||
auto promise = QSharedPointer<QPromise<bool>>::create();
|
||||
QFuture<bool> future = promise->future();
|
||||
QMetaObject::invokeMethod(
|
||||
clientCore_,
|
||||
[this, frame, promise]() mutable {
|
||||
bool ret = clientCore_->Send(frame);
|
||||
promise->addResult(ret);
|
||||
promise->finish();
|
||||
},
|
||||
Qt::QueuedConnection);
|
||||
return future;
|
||||
}
|
||||
|
||||
SendThread::SendThread(ClientCore* clientCore) : cliCore_(clientCore)
|
||||
{
|
||||
}
|
||||
@@ -265,23 +282,36 @@ void SendThread::run()
|
||||
auto frame = QSharedPointer<FrameBuffer>::create();
|
||||
frame->tid = task_->task.remoteId;
|
||||
frame->type = FBT_CLI_FILE_BUFFER;
|
||||
frame->call = [this](QSharedPointer<FrameBuffer> frame) { sendCall(frame); };
|
||||
|
||||
bool suc = true;
|
||||
isSuccess_ = true;
|
||||
while (!task_->file.atEnd()) {
|
||||
frame->data.resize(CHUNK_BUF_SIZE);
|
||||
auto br = task_->file.read(frame->data.data(), CHUNK_BUF_SIZE);
|
||||
if (br == -1) {
|
||||
qCritical() << QString(tr("read file failed: %1")).arg(task_->file.errorString());
|
||||
suc = false;
|
||||
isSuccess_ = false;
|
||||
break;
|
||||
}
|
||||
frame->data.resize(br);
|
||||
if (!cliCore_->Send(frame)) {
|
||||
|
||||
while (curSendCount_ >= MAX_SEND_TASK) {
|
||||
QThread::msleep(1);
|
||||
// shoule add abort action mark.
|
||||
}
|
||||
|
||||
QMetaObject::invokeMethod(this, [this, frame] {
|
||||
frame->sendRet = cliCore_->Send(frame);
|
||||
if (frame->call) {
|
||||
frame->call(frame);
|
||||
}
|
||||
});
|
||||
++curSendCount_;
|
||||
|
||||
if (!isSuccess_) {
|
||||
qCritical() << QString(tr("send to %1 file failed.")).arg(task_->task.remoteId);
|
||||
suc = false;
|
||||
break;
|
||||
}
|
||||
task_->tranSize += br;
|
||||
}
|
||||
task_->file.close();
|
||||
}
|
||||
@@ -289,4 +319,14 @@ void SendThread::run()
|
||||
void SendThread::setTask(const QSharedPointer<DoTransTask>& task)
|
||||
{
|
||||
task_ = task;
|
||||
}
|
||||
}
|
||||
|
||||
void SendThread::sendCall(QSharedPointer<FrameBuffer> frame)
|
||||
{
|
||||
if (frame->sendRet) {
|
||||
--curSendCount_;
|
||||
task_->tranSize += frame->data.size();
|
||||
} else {
|
||||
isSuccess_ = false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,9 +5,13 @@
|
||||
#include <QMap>
|
||||
#include <QMutex>
|
||||
#include <QVector>
|
||||
#include <QFuture>
|
||||
#include <QPromise>
|
||||
|
||||
#include "ClientCore.h"
|
||||
|
||||
constexpr int MAX_SEND_TASK = 10;
|
||||
|
||||
struct TransTask {
|
||||
bool isUpload{false};
|
||||
QString localId;
|
||||
@@ -43,9 +47,12 @@ public:
|
||||
public:
|
||||
void run() override;
|
||||
void setTask(const QSharedPointer<DoTransTask>& task);
|
||||
void sendCall(QSharedPointer<FrameBuffer> frame);
|
||||
|
||||
private:
|
||||
bool isSuccess_{ false };
|
||||
ClientCore* cliCore_;
|
||||
quint32 curSendCount_{0};
|
||||
QSharedPointer<DoTransTask> task_;
|
||||
};
|
||||
|
||||
@@ -76,6 +83,7 @@ private:
|
||||
private:
|
||||
void RegisterFrameCall();
|
||||
void SendFile(const QSharedPointer<DoTransTask>& task);
|
||||
QFuture<bool> sendFrameAsync(const QSharedPointer<FrameBuffer>& frame);
|
||||
|
||||
private:
|
||||
QSharedPointer<DoTransTask> downTask_;
|
||||
|
||||
@@ -14,12 +14,13 @@ void RemoteFile::setClientCore(ClientCore* cliCore)
|
||||
bool RemoteFile::GetHome()
|
||||
{
|
||||
InfoMsg info;
|
||||
return cliCore_->Send<InfoMsg>(info, FBT_CLI_ASK_HOME, cliCore_->GetRemoteID());
|
||||
auto frame = cliCore_->GetBuffer(info, FBT_CLI_ASK_HOME, cliCore_->GetRemoteID());
|
||||
return ClientCore::asyncInvoke(cliCore_, [this, frame]() { return cliCore_->Send(frame); });
|
||||
}
|
||||
|
||||
bool RemoteFile::GetDirFile(const QString& dir)
|
||||
{
|
||||
InfoMsg info;
|
||||
info.msg = dir;
|
||||
return cliCore_->Send<InfoMsg>(info, FBT_CLI_ASK_DIRFILE, cliCore_->GetRemoteID());
|
||||
auto frame = cliCore_->GetBuffer(info, FBT_CLI_ASK_DIRFILE, cliCore_->GetRemoteID());
|
||||
return ClientCore::asyncInvoke(cliCore_, [this, frame]() { return cliCore_->Send(frame); });
|
||||
}
|
||||
Reference in New Issue
Block a user