rpc : use std::unique_ptr for the message_queue

This commit is contained in:
Georgi Gerganov
2026-01-06 15:32:01 +02:00
parent df27d80ae3
commit 091d98e2c5

View File

@@ -507,23 +507,23 @@ class message_queue {
public:
message_queue() {}
bool push(const T &value) {
bool push(T value) {
std::unique_lock<std::mutex> lock(mutex);
if (interrupted) {
return false;
}
queue.push(value);
queue.push(std::move(value));
cvar.notify_all();
return true;
}
bool pop(T* out) {
bool pop(T& out) {
std::unique_lock<std::mutex> lock(mutex);
cvar.wait(lock, [this] { return !queue.empty() || interrupted; });
if (interrupted) {
return false;
}
*out = queue.front();
out = std::move(queue.front());
queue.pop();
return true;
}
@@ -572,7 +572,7 @@ private:
size_t output_size;
std::promise<void> completion;
};
using rpc_msg_ptr = std::shared_ptr<rpc_msg>;
using rpc_msg_ptr = std::unique_ptr<rpc_msg>;
using rpc_msg_queue = message_queue<rpc_msg_ptr>;
struct rpc_event {
rpc_msg_ptr msg;
@@ -590,7 +590,7 @@ static void rpc_dispatcher_trampoline(rpc_dispatcher * dispatcher)
}
void rpc_dispatcher::send(enum rpc_cmd cmd, std::shared_ptr<const void> input, size_t input_size) {
auto msg = std::make_shared<rpc_msg>();
auto msg = std::make_unique<rpc_msg>();
msg->cmd = cmd;
msg->input = input;
msg->input_size = input_size;
@@ -602,7 +602,7 @@ void rpc_dispatcher::send(enum rpc_cmd cmd, std::shared_ptr<const void> input, s
}
void rpc_dispatcher::send_async(enum rpc_cmd cmd, std::shared_ptr<const void> input, size_t input_size) {
auto msg = std::make_shared<rpc_msg>();
auto msg = std::make_unique<rpc_msg>();
msg->cmd = cmd;
msg->input = input;
msg->input_size = input_size;
@@ -612,7 +612,7 @@ void rpc_dispatcher::send_async(enum rpc_cmd cmd, std::shared_ptr<const void> in
}
void rpc_dispatcher::send(enum rpc_cmd cmd, std::shared_ptr<const void> input, size_t input_size, void * output, size_t output_size) {
auto msg = std::make_shared<rpc_msg>();
auto msg = std::make_unique<rpc_msg>();
msg->cmd = cmd;
msg->input = input;
msg->input_size = input_size;
@@ -624,7 +624,7 @@ void rpc_dispatcher::send(enum rpc_cmd cmd, std::shared_ptr<const void> input, s
}
void rpc_dispatcher::send_async(enum rpc_cmd cmd, std::shared_ptr<const void> input, size_t input_size, void * output, size_t output_size) {
auto msg = std::make_shared<rpc_msg>();
auto msg = std::make_unique<rpc_msg>();
msg->cmd = cmd;
msg->input = input;
msg->input_size = input_size;
@@ -635,7 +635,7 @@ void rpc_dispatcher::send_async(enum rpc_cmd cmd, std::shared_ptr<const void> in
ggml_backend_event_t rpc_dispatcher::event_new(ggml_backend_dev_t dev) {
rpc_event * ev = new rpc_event;
ev->msg = std::make_shared<rpc_msg>();
ev->msg = std::make_unique<rpc_msg>();
ev->msg->cmd = RPC_CMD_NONE;
ev->sf = ev->msg->completion.get_future().share();
GGML_ASSERT(queue.push(ev->msg));
@@ -657,7 +657,7 @@ void rpc_dispatcher::event_synchronize(ggml_backend_event_t event) {
void rpc_dispatcher::event_record(ggml_backend_event_t event) {
rpc_event * ev = (rpc_event *)event->context;
ev->msg = std::make_shared<rpc_msg>();
ev->msg = std::make_unique<rpc_msg>();
ev->msg->cmd = RPC_CMD_NONE;
ev->sf = ev->msg->completion.get_future().share();
GGML_ASSERT(queue.push(ev->msg));
@@ -665,7 +665,7 @@ void rpc_dispatcher::event_record(ggml_backend_event_t event) {
void rpc_dispatcher::synchronize() {
// to ensure all messages are processed, submit dummy message and wait for it to complete
auto msg = std::make_shared<rpc_msg>();
auto msg = std::make_unique<rpc_msg>();
msg->cmd = RPC_CMD_NONE;
GGML_ASSERT(queue.push(msg));
msg->completion.get_future().wait();
@@ -715,7 +715,7 @@ void rpc_dispatcher::start(const std::string & endpoint) {
void rpc_dispatcher::work() {
while (running) {
rpc_msg_ptr msg_ptr;
if (!queue.pop(&msg_ptr)) {
if (!queue.pop(msg_ptr)) {
break;
}
if (msg_ptr->cmd != RPC_CMD_NONE) {