Compare commits

...

16 Commits

Author SHA1 Message Date
Ruben Ortlam
4cabbe36e0 state 2026-04-09 13:00:31 +02:00
Ruben Ortlam
9f001cae27 state 2026-04-09 12:51:43 +02:00
Ruben Ortlam
88335c0490 state 2026-04-09 12:39:51 +02:00
Ruben Ortlam
204023c897 state 2026-04-09 12:36:15 +02:00
Ruben Ortlam
d88d722fc1 state 2026-04-09 12:32:08 +02:00
Ruben Ortlam
96d9516329 state 2026-04-09 12:25:27 +02:00
Ruben Ortlam
8a108eddb4 state 2026-04-09 12:05:15 +02:00
Ruben Ortlam
47dde34e00 state 2026-04-09 11:58:46 +02:00
Ruben Ortlam
8d0e158076 state 2026-04-09 11:51:39 +02:00
Ruben Ortlam
aade0f81dd state 2026-04-09 11:42:50 +02:00
Ruben Ortlam
700270239d state 2026-04-09 11:24:21 +02:00
Ruben Ortlam
ddaafa3dc1 state 2026-04-09 11:11:17 +02:00
Ruben Ortlam
e5e0be0add state 2026-04-09 11:00:36 +02:00
Ruben Ortlam
3c4eae7dc9 state 2026-04-09 07:50:05 +02:00
Ruben Ortlam
7e2799c8c9 state 2026-04-09 07:40:02 +02:00
Ruben Ortlam
cd0722594a state 2026-04-09 07:25:33 +02:00

View File

@@ -40,6 +40,7 @@ DispatchLoaderDynamic & ggml_vk_default_dispatcher();
#include <future>
#include <thread>
#if defined(_MSC_VER)
# define NOMINMAX 1
# include <windows.h>
@@ -580,6 +581,22 @@ static constexpr std::initializer_list<std::array<int, 3>> rms_norm_mul_rope_vie
};
struct vk_peer_copy_buf {
void * host_ptr = nullptr;
size_t host_size = 0;
vk_buffer src_buf; // host_ptr imported into source device
vk_buffer dst_buf; // host_ptr imported into dest device
};
struct vk_peer_staging {
std::vector<vk_peer_copy_buf> bufs; // per-copy buffer pool
size_t buf_idx = 0; // reset between iterations
// timeline semaphore on source device for hop1 synchronization
vk::Semaphore tl_sem;
uint64_t tl_sem_value = 0;
};
struct vk_device_struct {
std::recursive_mutex mutex;
@@ -857,6 +874,8 @@ struct vk_device_struct {
vk::Fence fence;
vk_buffer sync_staging;
std::map<vk_device_struct *, vk_peer_staging> peer_staging;
ggml_backend_buffer_type buffer_type;
bool disable_fusion;
@@ -871,6 +890,24 @@ struct vk_device_struct {
device.destroyFence(fence);
for (auto& [peer, staging] : peer_staging) {
if (staging.tl_sem) {
device.destroySemaphore(staging.tl_sem);
}
for (auto& buf : staging.bufs) {
buf.src_buf.reset();
buf.dst_buf.reset();
if (buf.host_ptr) {
#if defined(_MSC_VER) || defined(__MINGW32__)
_aligned_free(buf.host_ptr);
#else
free(buf.host_ptr);
#endif
}
}
}
peer_staging.clear();
ggml_vk_destroy_buffer(sync_staging);
compute_queue.cmd_pool.destroy(device);
@@ -1651,7 +1688,6 @@ typedef std::weak_ptr<vk_context_struct> vk_context_ref;
struct ggml_vk_garbage_collector {
std::vector<vk_semaphore> tl_semaphores;
std::vector<vk_semaphore> semaphores;
std::vector<vk::Event> events;
std::vector<vk_context> contexts;
};
@@ -2493,15 +2529,6 @@ static vk_context ggml_vk_create_temporary_context(vk_command_pool& p) {
return result;
}
static vk_semaphore * ggml_vk_create_binary_semaphore(ggml_backend_vk_context * ctx) {
VK_LOG_DEBUG("ggml_vk_create_timeline_semaphore()");
vk::SemaphoreTypeCreateInfo tci{ vk::SemaphoreType::eBinary, 0 };
vk::SemaphoreCreateInfo ci{};
ci.setPNext(&tci);
vk::Semaphore semaphore = ctx->device->device.createSemaphore(ci);
ctx->gc.semaphores.push_back({ semaphore, 0 });
return &ctx->gc.semaphores[ctx->gc.semaphores.size() - 1];
}
static vk_semaphore * ggml_vk_create_timeline_semaphore(ggml_backend_vk_context * ctx) {
VK_LOG_DEBUG("ggml_vk_create_timeline_semaphore()");
@@ -13331,10 +13358,14 @@ static void ggml_vk_graph_cleanup(ggml_backend_vk_context * ctx) {
ggml_vk_command_pool_cleanup(ctx->device, ctx->transfer_cmd_pool);
}
for (size_t i = 0; i < ctx->gc.semaphores.size(); i++) {
ctx->device->device.destroySemaphore({ ctx->gc.semaphores[i].s });
// Reset device-level command pools used by cross-device hop1 temporary contexts
if (!ctx->device->peer_staging.empty()) {
ggml_vk_queue_command_pools_cleanup(ctx->device);
}
for (auto& [peer, staging] : ctx->device->peer_staging) {
staging.buf_idx = 0;
}
ctx->gc.semaphores.clear();
for (size_t i = 0; i < ctx->gc.tl_semaphores.size(); i++) {
ctx->device->device.destroySemaphore({ ctx->gc.tl_semaphores[i].s });
@@ -13757,6 +13788,112 @@ static void ggml_backend_vk_get_tensor_async(ggml_backend_t backend, const ggml_
}
}
static vk_buffer ggml_vk_buffer_from_host_ptr(vk_device & device, void * ptr, size_t size);
static void ggml_vk_free_peer_copy_buf(vk_peer_copy_buf& buf) {
buf.src_buf.reset();
buf.dst_buf.reset();
if (buf.host_ptr) {
#if defined(_MSC_VER) || defined(__MINGW32__)
_aligned_free(buf.host_ptr);
#else
free(buf.host_ptr);
#endif
buf.host_ptr = nullptr;
buf.host_size = 0;
}
}
static bool ggml_vk_alloc_peer_copy_buf(vk_device& src_dev, vk_device& dst_dev,
vk_peer_copy_buf& buf, size_t required_size) {
uint64_t alignment = std::max(src_dev->min_imported_host_pointer_alignment,
dst_dev->min_imported_host_pointer_alignment);
if (alignment == 0) {
alignment = 4096;
}
size_t alloc_size = CEIL_DIV(required_size, alignment) * alignment;
void * host_ptr = nullptr;
#if defined(_MSC_VER) || defined(__MINGW32__)
host_ptr = _aligned_malloc(alloc_size, (size_t)alignment);
#else
if (posix_memalign(&host_ptr, (size_t)alignment, alloc_size) != 0) {
host_ptr = nullptr;
}
#endif
if (!host_ptr) {
return false;
}
vk_buffer src_buf = ggml_vk_buffer_from_host_ptr(src_dev, host_ptr, alloc_size);
if (!src_buf) {
#if defined(_MSC_VER) || defined(__MINGW32__)
_aligned_free(host_ptr);
#else
free(host_ptr);
#endif
return false;
}
vk_buffer dst_buf = ggml_vk_buffer_from_host_ptr(dst_dev, host_ptr, alloc_size);
if (!dst_buf) {
src_buf.reset();
#if defined(_MSC_VER) || defined(__MINGW32__)
_aligned_free(host_ptr);
#else
free(host_ptr);
#endif
return false;
}
buf = { host_ptr, alloc_size, std::move(src_buf), std::move(dst_buf) };
return true;
}
// Returns a per-copy buffer from the pool, or null on failure.
// Ensures the peer_staging entry and its semaphores exist.
static vk_peer_copy_buf * ggml_vk_get_peer_copy_buf(vk_device& src_dev, vk_device& dst_dev,
size_t required_size) {
if (!src_dev->external_memory_host || !dst_dev->external_memory_host) {
return nullptr;
}
auto& staging = src_dev->peer_staging[dst_dev.get()];
// Lazy-init timeline semaphore on first use
if (!staging.tl_sem) {
vk::SemaphoreTypeCreateInfo tci{ vk::SemaphoreType::eTimeline, 0 };
vk::SemaphoreCreateInfo sci{};
sci.setPNext(&tci);
staging.tl_sem = src_dev->device.createSemaphore(sci);
}
// Get or create a buffer from the pool
if (staging.buf_idx < staging.bufs.size()) {
auto& buf = staging.bufs[staging.buf_idx];
// Resize if too small
if (buf.host_size < required_size) {
ggml_vk_free_peer_copy_buf(buf);
if (!ggml_vk_alloc_peer_copy_buf(src_dev, dst_dev, buf, required_size)) {
return nullptr;
}
}
staging.buf_idx++;
return &buf;
}
// Pool exhausted — allocate a new entry
staging.bufs.emplace_back();
auto& buf = staging.bufs.back();
if (!ggml_vk_alloc_peer_copy_buf(src_dev, dst_dev, buf, required_size)) {
staging.bufs.pop_back();
return nullptr;
}
staging.buf_idx++;
return &buf;
}
static bool ggml_backend_vk_cpy_tensor_async(ggml_backend_t backend_src, ggml_backend_t backend_dst, const ggml_tensor * src, ggml_tensor * dst) {
VK_LOG_DEBUG("ggml_backend_vk_cpy_tensor_async(" << src << " -> " << dst << ", size=" << ggml_nbytes(src) << ")");
ggml_backend_vk_context * ctx = (ggml_backend_vk_context *)backend_dst->context;
@@ -13776,9 +13913,66 @@ static bool ggml_backend_vk_cpy_tensor_async(ggml_backend_t backend_src, ggml_ba
if (ggml_backend_buffer_is_vk(src->buffer)) {
ggml_backend_vk_buffer_context * src_buf_ctx = (ggml_backend_vk_buffer_context *)src->buffer->context;
// Async copy only works within the same device
if (src_buf_ctx->dev_buffer->device != dst_buf->device) {
return false;
// Cross-device copy via per-copy staging buffer
ggml_backend_vk_context * src_ctx = (ggml_backend_vk_context *)backend_src->context;
vk_device src_dev = src_ctx->device;
vk_device dst_dev = ctx->device;
size_t nbytes = ggml_nbytes(src);
vk_buffer src_vk_buf = src_buf_ctx->dev_buffer;
size_t src_offset = vk_tensor_offset(src) + src->view_offs;
size_t dst_offset = vk_tensor_offset(dst) + dst->view_offs;
vk_peer_copy_buf * copy_buf = ggml_vk_get_peer_copy_buf(src_dev, dst_dev, nbytes);
if (!copy_buf) {
return false;
}
auto& staging = src_dev->peer_staging[dst_dev.get()];
// HOP 1: src VRAM → staging (on source compute queue)
// Implicit queue submission ordering guarantees this
// executes after all prior compute work.
vk_context hop1_ctx;
{
std::lock_guard<std::recursive_mutex> guard(src_dev->mutex);
hop1_ctx = ggml_vk_create_temporary_context(src_dev->compute_queue.cmd_pool);
ggml_vk_ctx_begin(src_dev, hop1_ctx);
VkBufferCopy bc{ src_offset, 0, nbytes };
vkCmdCopyBuffer(hop1_ctx->s->buffer->buf,
(VkBuffer)src_vk_buf->buffer,
(VkBuffer)copy_buf->src_buf->buffer,
1, &bc);
ggml_vk_ctx_end(hop1_ctx);
}
// Submit hop1, signal timeline semaphore, CPU wait
staging.tl_sem_value++;
hop1_ctx->seqs.back().back().signal_semaphores.push_back(
{ staging.tl_sem, staging.tl_sem_value });
ggml_vk_submit(hop1_ctx, {});
src_ctx->submit_pending = true;
vk::SemaphoreWaitInfo swi{
vk::SemaphoreWaitFlags{},
1, &staging.tl_sem, &staging.tl_sem_value
};
VK_CHECK(src_dev->device.waitSemaphores(swi, UINT64_MAX),
"cross_device_hop1 waitSemaphores");
// HOP 2: staging → dst VRAM (on dest device)
vk_context dst_compute_ctx = ggml_vk_get_compute_ctx(ctx);
VkBufferCopy bc2{ 0, dst_offset, nbytes };
vkCmdCopyBuffer(dst_compute_ctx->s->buffer->buf,
(VkBuffer)copy_buf->dst_buf->buffer,
(VkBuffer)dst_buf->buffer,
1, &bc2);
return true;
}
vk_context compute_ctx = ggml_vk_get_compute_ctx(ctx);
@@ -13815,7 +14009,6 @@ static bool ggml_backend_vk_cpy_tensor_async(ggml_backend_t backend_src, ggml_ba
src->data, ggml_nbytes(src));
}
GGML_UNUSED(backend_src);
return false;
}