openai-api/server/server.cxx

164 lines
4.9 KiB
C++
Raw Permalink Normal View History

2025-03-04 15:17:10 +08:00
#include "server.h"
2025-04-14 21:01:36 +08:00
2025-04-14 22:49:10 +08:00
#include <cstdint>
2025-03-04 15:17:10 +08:00
#include <iostream>
2025-04-14 22:49:10 +08:00
Server::Server(asio::io_context& io_context, uint16_t port) : io_context_(io_context), acceptor_(io_context)
2025-03-04 15:17:10 +08:00
{
port_ = port;
2025-03-04 15:17:10 +08:00
}
2025-03-05 08:10:55 +08:00
Server::~Server()
{
for (auto& client : clients_) {
client.second.detach();
}
}
void Server::print_exception(const std::exception& e)
{
std::cerr << e.what() << '\n';
}
2025-03-04 15:17:10 +08:00
void Server::start()
{
asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port_);
2025-03-04 15:17:10 +08:00
try {
acceptor_.open(endpoint.protocol());
// acceptor_.set_option(asio::socket_base::reuse_address(true));
acceptor_.bind(endpoint);
2025-03-04 15:17:10 +08:00
acceptor_.listen();
do_accept();
} catch (const std::exception& e) {
print_exception(e);
2025-03-04 15:17:10 +08:00
}
}
void Server::stop()
{
}
void Server::set_worker(std::shared_ptr<COpenAI> worker, std::shared_ptr<CJsonOper> json)
{
worker_ = worker;
json_ = json;
}
2025-04-14 22:49:10 +08:00
void Server::set_token(int32_t tokens)
2025-03-05 09:20:03 +08:00
{
tokens_ = tokens;
}
2025-03-04 15:17:10 +08:00
void Server::do_accept()
{
auto socket = std::make_shared<asio::ip::tcp::socket>(io_context_);
acceptor_.async_accept(*socket, [this, socket](const std::error_code& ec) {
if (!ec) {
auto endpoint = socket->remote_endpoint();
std::string client_key = endpoint.address().to_string() + ":" + std::to_string(endpoint.port());
std::unique_lock<std::mutex> lock(cli_mutex_);
client_map_[client_key] = std::make_shared<ClientCache>();
2025-04-14 22:49:10 +08:00
clients_.insert(std::make_pair(socket->remote_endpoint().address().to_string(),
std::thread([this, socket, client_key]() { th_client(socket, client_key); })));
2025-03-04 15:17:10 +08:00
}
do_accept();
});
}
void Server::th_client(const std::shared_ptr<asio::ip::tcp::socket>& socket, const std::string& client_key)
{
2025-03-05 08:10:55 +08:00
std::shared_ptr<int> deleter(new int(0), [&](int* p) {
std::unique_lock<std::mutex> lock(cli_mutex_);
delete p;
client_map_.erase(client_key);
if (clients_.find(client_key) != clients_.end()) {
clients_.at(client_key).detach();
clients_.erase(client_key);
}
std::cout << "th_client deleter client " << client_key << "exit." << std::endl;
});
2025-03-04 15:17:10 +08:00
asio::error_code error;
std::shared_ptr<ClientCache> cache = nullptr;
{
std::unique_lock<std::mutex> lock(cli_mutex_);
cache = client_map_[client_key];
}
while (true) {
auto len = socket->read_some(asio::buffer(cache->tmp_buf_), error);
if (error == asio::error::eof) {
break; // Connection closed cleanly by peer.
} else if (error) {
break; // Some other error.
}
cache->buffer_.push(cache->tmp_buf_.data(), len);
while (true) {
2025-04-14 22:49:10 +08:00
auto* frame = com_parse(cache->buffer_);
2025-03-04 15:17:10 +08:00
if (frame == nullptr) {
break;
}
2025-03-05 09:20:03 +08:00
if (use_tokens_ > tokens_) {
std::cout << client_key << " tokens not enough" << std::endl;
FrameData req;
req.type = FrameType::TYPE_OUT_OF_LIMIT;
send_frame(socket, req);
continue;
}
std::cout << client_key << " 's data." << std::endl;
2025-03-04 15:17:10 +08:00
if (frame->type == FrameType::TYPE_REQUEST) {
ask_mutex_.lock();
2025-03-04 19:46:29 +08:00
std::string recv_data(frame->data, frame->len);
std::string out{};
if (!worker_->post(post_data(recv_data), out)) {
std::cout << client_key << " data post error" << std::endl;
FrameData req;
req.type = FrameType::TYPE_RESPONSE_ERROR;
send_frame(socket, req);
} else {
2025-03-05 08:10:55 +08:00
auto parse = json_->parse(out);
2025-03-04 19:46:29 +08:00
FrameData req;
req.type = FrameType::TYPE_RESPONSE_SUCCESS;
2025-03-05 11:50:35 +08:00
req.len = parse.message_content.size();
2025-03-05 08:10:55 +08:00
req.data = new char[req.len];
req.protk = parse.prompt_tokens;
req.coptk = parse.completion_tokens;
2025-03-05 09:20:03 +08:00
use_tokens_ += req.protk;
use_tokens_ += req.coptk;
std::cout << "Already use " << use_tokens_ << " tokens.\n";
2025-03-05 08:10:55 +08:00
memcpy(req.data, parse.message_content.c_str(), parse.message_content.size());
2025-03-04 19:46:29 +08:00
send_frame(socket, req);
}
2025-03-04 15:17:10 +08:00
ask_mutex_.unlock();
}
delete frame;
}
}
}
2025-03-04 19:46:29 +08:00
std::string Server::post_data(const std::string& data)
{
return json_->format_request(data);
2025-03-04 19:46:29 +08:00
}
bool Server::send_frame(const std::shared_ptr<asio::ip::tcp::socket>& socket, FrameData& data)
{
asio::error_code error;
char* send_data{};
int len{};
if (!com_pack(&data, &send_data, len)) {
return false;
}
auto send_len = socket->send(asio::buffer(send_data, len));
2025-03-05 08:10:55 +08:00
delete[] send_data;
2025-03-04 19:46:29 +08:00
return send_len == len;
2025-04-14 21:01:36 +08:00
}