Files
frelay/ClientCore/ClientCore.cpp
2026-03-24 16:53:06 +08:00

446 lines
12 KiB
C++

#include "ClientCore.h"
#include <QDebug>
ClientCore::ClientCore(QObject* parent, asio::io_context& ioContext)
: QObject(parent), ioContext_(ioContext), mSocket_(ioContext_)
{
}
ClientCore::~ClientCore()
{
}
void ClientCore::Instance()
{
// qDebug() << "Instance() thread:" << QThread::currentThread();
// connect(socket_, &QTcpSocket::readyRead, this, &ClientCore::onReadyRead);
// connect(socket_, &QTcpSocket::disconnected, this, &ClientCore::onDisconnected);
}
void ClientCore::DoConnect(const QString& ip, quint16 port)
{
// qDebug() << "doConnect thread:" << QThread::currentThread();
emit connecting();
if (!Connect(ip, port)) {
emit conFailed();
return;
}
emit conSuccess();
}
bool ClientCore::Connect(const QString& ip, quint16 port)
{
if (connected_) {
qInfo() << QString(tr("已连接。"));
return true;
}
try {
asio::ip::tcp::resolver resolver(ioContext_);
asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(ip.toStdString(), std::to_string(port));
asio::connect(mSocket_, endpoints);
return true;
} catch (const std::exception& ex) {
qCritical() << QString(tr("%1:%2 连接失败。%3")).arg(ip).arg(port).arg(ex.what());
return false;
}
qInfo() << QString(tr("%1:%2 连接成功。")).arg(ip).arg(port);
connected_ = true;
return true;
}
void ClientCore::Disconnect()
{
QMutexLocker locker(&conMutex_);
if (mSocket_.is_open()) {
try {
mSocket_.shutdown(asio::ip::tcp::socket::shutdown_both);
mSocket_.close();
} catch (const std::exception& ex) {
qCritical() << QString(tr("Error during disconnection: %1")).arg(ex.what());
}
}
// if (socket_ && socket_->state() != QAbstractSocket::UnconnectedState) {
// socket_->disconnectFromHost();
// if (socket_->state() != QAbstractSocket::UnconnectedState) {
// socket_->waitForDisconnected(1000);
// }
// }
connected_ = false;
}
asio::awaitable<void> ClientCore::recvData()
{
for (; mSocket_.is_open();) {
try {
auto bytes_transferred = co_await mSocket_.async_read_some(asio::buffer(mBuffer_), asio::use_awaitable);
recvBuffer_.append(mBuffer_.data(), bytes_transferred);
while (true) {
auto frame = Protocol::ParseBuffer(recvBuffer_);
if (frame == nullptr) {
break;
}
std::unique_lock lock(frameMutex_);
// 暂时这样,后面优化成控制信息优先发出。
frameQueue_.push(frame);
}
} catch (const std::exception& ex) {
qCritical() << QString(tr("Error during receiving header: %1")).arg(ex.what());
co_return;
}
}
}
void ClientCore::onReadyRead()
{
// QByteArray data = socket_->readAll();
// recvBuffer_.append(data);
// while (true) {
// auto frame = Protocol::ParseBuffer(recvBuffer_);
// if (frame == nullptr) {
// break;
// }
// UseFrame(frame);
// }
}
void ClientCore::onDisconnected()
{
connected_ = false;
qCritical() << QString("你 [%1] 断开了。").arg(ownID_);
emit sigDisconnect();
}
asio::awaitable<void> ClientCore::handleAsk(QSharedPointer<FrameBuffer> frame)
{
InfoMsg msg = infoUnpack<InfoMsg>(frame->data);
// TODO: 处理询问请求
if (msg.command == STRMSG_AC_CHECK_FILE_EXIST) {
msg.command = STRMSG_AC_ANSWER_FILE_EXIST;
for (auto& item : msg.mapData) {
if (item.command == STRMSG_AC_UP) {
if (!Util::DirExist(item.remotePath, false)) {
item.state = static_cast<qint32>(FCS_DIR_NOT_EXIST);
continue;
}
auto newerPath = Util::Get2FilePath(item.localPath, item.remotePath);
if (Util::FileExist(newerPath)) {
item.state = static_cast<qint32>(FCS_FILE_EXIST);
continue;
}
} else {
if (!Util::FileExist(item.remotePath)) {
item.state = static_cast<qint32>(FCS_FILE_NOT_EXIST);
}
}
}
bool sr = co_await Send<InfoMsg>(msg, FBT_MSGINFO_ANSWER, frame->fid);
if (!sr) {
auto logMsg = tr("") + frame->fid + tr("返回检查文件存在性消息失败。");
qCritical() << logMsg;
co_return;
}
co_return;
}
if (msg.command == STRMSG_AC_RENAME_FILEDIR) {
msg.command = STRMSG_AC_ANSWER_RENAME_FILEDIR;
msg.msg = Util::Rename(msg.fromPath, msg.toPath, msg.type == STR_DIR);
if (!co_await Send<InfoMsg>(msg, FBT_MSGINFO_ANSWER, frame->fid)) {
auto logMsg = tr("") + frame->fid + tr("返回重命名结果消息失败。");
qCritical() << logMsg;
co_return;
}
co_return;
}
if (msg.command == STRMSG_AC_DEL_FILEDIR) {
msg.command = STRMSG_AC_ANSWER_DEL_FILEDIR;
msg.msg = Util::Delete(msg.fromPath);
if (!co_await Send<InfoMsg>(msg, FBT_MSGINFO_ANSWER, frame->fid)) {
auto logMsg = tr("") + frame->fid + tr("返回删除结果消息失败。");
qCritical() << logMsg;
co_return;
}
co_return;
}
if (msg.command == STRMSG_AC_NEW_DIR) {
msg.command = STRMSG_AC_ANSWER_NEW_DIR;
msg.msg = Util::NewDir(msg.fromPath);
if (!co_await Send<InfoMsg>(msg, FBT_MSGINFO_ANSWER, frame->fid)) {
auto logMsg = tr("") + frame->fid + tr("返回新建结果消息失败。");
qCritical() << logMsg;
co_return;
}
co_return;
}
// 未知信息
qWarning() << QString(tr("未知询问信息类型:%1")).arg(msg.command);
}
asio::awaitable<void> ClientCore::UseFrame(QSharedPointer<FrameBuffer> frame)
{
switch (frame->type) {
case FrameBufferType::FBT_SER_MSG_ASKCLIENTS: {
InfoClientVec info = infoUnpack<InfoClientVec>(frame->data);
emit sigClients(info);
break;
}
case FrameBufferType::FBT_SER_MSG_YOURID: {
ownID_ = frame->data;
GlobalData::Ins()->SetLocalID(ownID_);
qInfo() << QString(tr("本机ID: %1")).arg(ownID_);
emit sigYourId(frame);
break;
}
case FrameBufferType::FBT_CLI_ANS_DIRFILE: {
DirFileInfoVec info = infoUnpack<DirFileInfoVec>(frame->data);
emit sigFiles(info);
break;
}
case FrameBufferType::FBT_CLI_ASK_DIRFILE: {
DirFileInfoVec vec;
InfoMsg info = infoUnpack<InfoMsg>(frame->data);
if (!localFile_.GetDirFile(info.msg, vec)) {
qWarning() << QString(tr("访问文件失败: %1")).arg(info.msg);
co_return;
}
if (!co_await Send<DirFileInfoVec>(vec, FBT_CLI_ANS_DIRFILE, frame->fid)) {
qCritical() << QString(tr("发送文件列表结果失败。"));
co_return;
}
break;
}
case FrameBufferType::FBT_CLI_ASK_HOME: {
InfoMsg info;
info.msg = Util::GetUserHome();
info.list = Util::GetLocalDrivers();
if (!co_await Send<InfoMsg>(info, FBT_CLI_ANS_HOME, frame->fid)) {
qCritical() << QString(tr("send home failed."));
co_return;
}
break;
}
case FrameBufferType::FBT_CLI_ANS_HOME: {
InfoMsg info = infoUnpack<InfoMsg>(frame->data);
qInfo() << QString(tr("用户目录:%1")).arg(info.msg);
emit sigPath(info.msg, info.list);
break;
}
case FrameBufferType::FBT_SER_MSG_FORWARD_FAILED: {
qCritical() << QString(tr("转发数据失败,fid:%1, tid:%2, type:%3"))
.arg(frame->fid)
.arg(frame->tid)
.arg(static_cast<uint32_t>(frame->type));
break;
}
case FrameBufferType::FBT_CLI_REQ_SEND: {
emit sigReqSend(frame);
break;
}
case FrameBufferType::FBT_CLI_REQ_DOWN: {
emit sigReqDown(frame);
break;
}
case FrameBufferType::FBT_CLI_TRANS_DONE: {
emit sigTransDone(frame);
break;
}
case FrameBufferType::FBT_CLI_CAN_SEND: {
emit sigCanSend(frame);
break;
}
case FrameBufferType::FBT_CLI_CANOT_SEND: {
emit sigCanotSend(frame);
break;
}
case FBT_CLI_CANOT_DOWN: {
emit sigCanotDown(frame);
break;
}
case FBT_CLI_CAN_DOWN: {
emit sigCanDown(frame);
break;
}
case FBT_CLI_FILE_BUFFER: {
emit sigFileBuffer(frame);
break;
}
case FBT_CLI_TRANS_FAILED: {
emit sigTransFailed(frame);
break;
}
case FBT_CLI_FILE_INFO: {
emit sigFileInfo(frame);
break;
}
case FBT_SER_MSG_OFFLINE: {
popID(frame->fid);
emit sigOffline(frame);
break;
}
case FBT_MSGINFO_ASK: {
co_await handleAsk(frame);
break;
}
case FBT_MSGINFO_ANSWER: {
emit sigMsgAnswer(frame);
break;
}
case FBT_SER_FLOW_LIMIT: {
emit sigFlowBack(frame);
break;
}
case FBT_CLI_TRANS_INTERRUPT: {
emit sigTransInterrupt(frame);
break;
}
default:
qCritical() << QString("未知的帧类型: %1").arg(frame->type);
break;
}
}
asio::awaitable<bool> ClientCore::Send(QSharedPointer<FrameBuffer> frame)
{
if (frame == nullptr) {
co_return false;
}
auto data = Protocol::PackBuffer(frame);
if (data.size() == 0) {
co_return false;
}
co_return co_await Send(data.constData(), data.size());
}
asio::awaitable<bool> ClientCore::Send(const char* data, size_t size)
{
try {
//auto c = mSocket_.get_executor();
auto bytes_transferred = co_await asio::async_write(mSocket_, asio::buffer(data, size), asio::use_awaitable);
co_return bytes_transferred == size;
} catch (std::exception& e) {
qCritical() << QString("向服务器发送数据失败: %1").arg(e.what());
co_return false;
}
}
bool ClientCore::IsConnect()
{
return connected_;
}
const asio::any_io_executor& ClientCore::get_executor()
{
return mSocket_.get_executor();
}
void ClientCore::SetRemoteID(const QString& id)
{
GlobalData::Ins()->SetRemoteID(id);
remoteID_ = id;
}
QString ClientCore::GetRemoteID()
{
return remoteID_;
}
QString ClientCore::GetOwnID()
{
return ownID_;
}
void ClientCore::pushID(const QString& id)
{
QWriteLocker locker(&rwIDLock_);
if (!remoteIDs_.contains(id)) {
remoteIDs_.push_back(id);
}
}
void ClientCore::popID(const QString& id)
{
QWriteLocker locker(&rwIDLock_);
remoteIDs_.removeAll(id);
}
SocketWorker::SocketWorker(ClientCore* core, QObject* parent) : QThread(parent), core_(core)
{
// connect(core_, &ClientCore::sigDisconnect, this, [this]() {
// thread()->quit();
// });
}
SocketWorker::~SocketWorker()
{
}
void SocketWorker::run()
{
core_->Instance();
exec();
}
HeatBeat::HeatBeat(ClientCore* core, QObject* parent) : QThread(parent), core_(core)
{
}
HeatBeat::~HeatBeat()
{
Stop();
}
void HeatBeat::Stop()
{
isRun_ = false;
}
void HeatBeat::run()
{
isRun_ = true;
InfoMsg info;
auto frame = core_->GetBuffer(info, FBT_SER_MSG_HEARTBEAT, "");
while (isRun_) {
QThread::sleep(1);
if (!core_->IsConnect()) {
continue;
}
ClientCore::syncInvoke(core_, frame);
auto rid = core_->GetRemoteID();
if (!rid.isEmpty()) {
auto frame2 = core_->GetBuffer(info, FBT_SER_MSG_JUDGE_OTHER_ALIVE, rid);
ClientCore::syncInvoke(core_, frame2);
}
{
QReadLocker loker(&core_->rwIDLock_);
for (auto& id : core_->remoteIDs_) {
auto frame3 = core_->GetBuffer(info, FBT_SER_MSG_JUDGE_OTHER_ALIVE, id);
ClientCore::syncInvoke(core_, frame3);
}
}
}
}
WaitThread::WaitThread(QObject* parent) : QThread(parent)
{
}
void WaitThread::SetClient(ClientCore* cli)
{
cli_ = cli;
}
bool WaitThread::IsQuit() const
{
return isAlreadyInter_;
}
void WaitThread::interrupCheck()
{
if (!isAlreadyInter_) {
isAlreadyInter_ = true;
emit sigCheckOver();
}
}