|
7 | 7 | // in the same context. |
8 | 8 |
|
9 | 9 | #include <cstring> |
| 10 | +#include <mutex> |
| 11 | +#include <thread> |
10 | 12 |
|
11 | 13 | #include "uur/fixtures.h" |
12 | 14 |
|
@@ -301,3 +303,196 @@ TEST_P(urMultiDeviceContextMemBufferTest, WriteKernelKernelRead) { |
301 | 303 | ASSERT_EQ(a, fill_val + 2); |
302 | 304 | } |
303 | 305 | } |
| 306 | + |
| 307 | +TEST_P(urMultiDeviceContextMemBufferTest, PingPongKernelExecution) { |
| 308 | + if (num_devices <= 1) { |
| 309 | + GTEST_SKIP(); |
| 310 | + } |
| 311 | + |
| 312 | + // Setup kernels for alternating devices |
| 313 | + AddBuffer1DArg(0, 0, buffer); |
| 314 | + AddBuffer1DArg(1, 0, buffer); |
| 315 | + |
| 316 | + T fill_val = 50; |
| 317 | + std::vector<T> in_vec(buffer_size, fill_val); |
| 318 | + std::vector<T> out_vec(buffer_size); |
| 319 | + |
| 320 | + const uint32_t ping_pong_iterations = 20; |
| 321 | + std::vector<ur_event_handle_t> events(ping_pong_iterations); |
| 322 | + ur_event_handle_t write_event, read_event; |
| 323 | + |
| 324 | + size_t work_dims[3] = {buffer_size, 1, 1}; |
| 325 | + size_t offset[3] = {0, 0, 0}; |
| 326 | + |
| 327 | + ASSERT_SUCCESS(urEnqueueMemBufferWrite(queues[0], buffer, false, 0, |
| 328 | + buffer_size_bytes, in_vec.data(), 0, |
| 329 | + nullptr, &write_event)); |
| 330 | + |
| 331 | + // Ping-pong kernel execution across two devices |
| 332 | + // Each kernel increments the buffer values by 1 |
| 333 | + for (uint32_t i = 0; i < ping_pong_iterations; ++i) { |
| 334 | + uint32_t device_idx = i % 2; |
| 335 | + ur_event_handle_t *wait_event = (i == 0) ? &write_event : &events[i - 1]; |
| 336 | + |
| 337 | + ASSERT_SUCCESS(urEnqueueKernelLaunchWithArgsExp( |
| 338 | + queues[device_idx], kernels[device_idx], 1, offset, work_dims, nullptr, |
| 339 | + static_cast<uint32_t>(kernel_args[device_idx].size()), |
| 340 | + kernel_args[device_idx].data(), nullptr, 1, wait_event, &events[i])); |
| 341 | + } |
| 342 | + |
| 343 | + ASSERT_SUCCESS(urEnqueueMemBufferRead( |
| 344 | + queues[0], buffer, false, 0, buffer_size_bytes, out_vec.data(), 1, |
| 345 | + &events[ping_pong_iterations - 1], &read_event)); |
| 346 | + |
| 347 | + ASSERT_SUCCESS(urEventWait(1, &read_event)); |
| 348 | + |
| 349 | + // Verify final result |
| 350 | + for (auto &a : out_vec) { |
| 351 | + ASSERT_EQ(a, fill_val + ping_pong_iterations); |
| 352 | + } |
| 353 | +} |
| 354 | + |
| 355 | +TEST_P(urMultiDeviceContextMemBufferTest, ComplexMigrationPattern) { |
| 356 | + if (num_devices <= 1) { |
| 357 | + GTEST_SKIP(); |
| 358 | + } |
| 359 | + |
| 360 | + AddBuffer1DArg(1, 0, buffer); |
| 361 | + |
| 362 | + T fill_val = 200; |
| 363 | + std::vector<T> in_vec(buffer_size, fill_val); |
| 364 | + std::vector<T> intermediate_vec(buffer_size); |
| 365 | + std::vector<T> out_vec(buffer_size); |
| 366 | + |
| 367 | + ur_event_handle_t write_event, intermediate_read_event, |
| 368 | + intermediate_write_event, final_kernel_event, read_event; |
| 369 | + |
| 370 | + size_t work_dims[3] = {buffer_size, 1, 1}; |
| 371 | + size_t offset[3] = {0, 0, 0}; |
| 372 | + |
| 373 | + // Write on device 0, execute kernel on device 1 |
| 374 | + ASSERT_SUCCESS(urEnqueueMemBufferWrite(queues[0], buffer, false, 0, |
| 375 | + buffer_size_bytes, in_vec.data(), 0, |
| 376 | + nullptr, &write_event)); |
| 377 | + |
| 378 | + ur_event_handle_t phase1_kernel_event; |
| 379 | + ASSERT_SUCCESS(urEnqueueKernelLaunchWithArgsExp( |
| 380 | + queues[1], kernels[1], 1, offset, work_dims, nullptr, |
| 381 | + static_cast<uint32_t>(kernel_args[1].size()), kernel_args[1].data(), |
| 382 | + nullptr, 1, &write_event, &phase1_kernel_event)); |
| 383 | + |
| 384 | + // Read intermediate result back to host |
| 385 | + ASSERT_SUCCESS(urEnqueueMemBufferRead( |
| 386 | + queues[0], buffer, false, 0, buffer_size_bytes, intermediate_vec.data(), |
| 387 | + 1, &phase1_kernel_event, &intermediate_read_event)); |
| 388 | + |
| 389 | + // Modify on host and write back via the same device that performed the read |
| 390 | + // (queues[0]). This ensures a coherent ownership chain: the device that last |
| 391 | + // read the buffer writes the updated data back, so the subsequent kernel |
| 392 | + // launch on device 1 observes the correct value after migration. |
| 393 | + ASSERT_SUCCESS(urEventWait(1, &intermediate_read_event)); |
| 394 | + for (auto &val : intermediate_vec) { |
| 395 | + val += 10; |
| 396 | + } |
| 397 | + |
| 398 | + ASSERT_SUCCESS(urEnqueueMemBufferWrite( |
| 399 | + queues[0], buffer, false, 0, buffer_size_bytes, intermediate_vec.data(), |
| 400 | + 0, nullptr, &intermediate_write_event)); |
| 401 | + |
| 402 | + // Final kernel execution on device 1, triggering migration from device 0 |
| 403 | + ASSERT_SUCCESS(urEnqueueKernelLaunchWithArgsExp( |
| 404 | + queues[1], kernels[1], 1, offset, work_dims, nullptr, |
| 405 | + static_cast<uint32_t>(kernel_args[1].size()), kernel_args[1].data(), |
| 406 | + nullptr, 1, &intermediate_write_event, &final_kernel_event)); |
| 407 | + |
| 408 | + // Final read via device 0 |
| 409 | + ASSERT_SUCCESS(urEnqueueMemBufferRead(queues[0], buffer, false, 0, |
| 410 | + buffer_size_bytes, out_vec.data(), 1, |
| 411 | + &final_kernel_event, &read_event)); |
| 412 | + |
| 413 | + ASSERT_SUCCESS(urEventWait(1, &read_event)); |
| 414 | + |
| 415 | + // Verify result: initial + 1 (phase1) + 10 (host) + 1 (final) = initial + 12 |
| 416 | + for (auto &a : out_vec) { |
| 417 | + ASSERT_EQ(a, fill_val + 12); |
| 418 | + } |
| 419 | +} |
| 420 | + |
| 421 | +TEST_P(urMultiDeviceContextMemBufferTest, KernelsExecutionWithThreads) { |
| 422 | + if (num_devices <= 1) { |
| 423 | + GTEST_SKIP(); |
| 424 | + } |
| 425 | + |
| 426 | + AddBuffer1DArg(0, 0, buffer); |
| 427 | + AddBuffer1DArg(1, 0, buffer); |
| 428 | + |
| 429 | + T fill_val = 100; |
| 430 | + std::vector<T> in_vec(buffer_size, fill_val); |
| 431 | + std::vector<T> out_vec(buffer_size); |
| 432 | + |
| 433 | + const uint32_t thread_count = 8; |
| 434 | + const uint32_t iterations_per_thread = 10; |
| 435 | + std::vector<std::thread> threads(thread_count); |
| 436 | + ur_event_handle_t write_event, read_event; |
| 437 | + |
| 438 | + size_t work_dims[3] = {buffer_size, 1, 1}; |
| 439 | + size_t offset[3] = {0, 0, 0}; |
| 440 | + |
| 441 | + // Create a dedicated queue per thread so that concurrent kernel submissions |
| 442 | + // from different threads go through independent queues. This exercises the |
| 443 | + // runtime's ability to handle simultaneous multi-queue access to the same |
| 444 | + // buffer, which is the primary intent of this migration test. |
| 445 | + std::vector<ur_queue_handle_t> thread_queues(thread_count); |
| 446 | + for (auto t = 0u; t < thread_count; ++t) { |
| 447 | + ASSERT_SUCCESS( |
| 448 | + urQueueCreate(context, devices[t % 2], nullptr, &thread_queues[t])); |
| 449 | + } |
| 450 | + |
| 451 | + ASSERT_SUCCESS(urEnqueueMemBufferWrite(queues[0], buffer, false, 0, |
| 452 | + buffer_size_bytes, in_vec.data(), 0, |
| 453 | + nullptr, &write_event)); |
| 454 | + |
| 455 | + // A shared last-event and its lock ensure a total ordering across all kernel |
| 456 | + // submissions from all threads. Each thread reads the current last event as |
| 457 | + // its dependency, submits its kernel, then updates the shared last event |
| 458 | + // before releasing the lock. This guarantees no two kernels race on the |
| 459 | + // buffer while still exercising concurrent multi-queue submission paths. |
| 460 | + std::mutex last_event_mutex; |
| 461 | + ur_event_handle_t shared_last_event = write_event; |
| 462 | + |
| 463 | + for (auto t = 0u; t < thread_count; ++t) { |
| 464 | + threads[t] = std::thread([&, t]() { |
| 465 | + const uint32_t device_idx = t % 2; |
| 466 | + for (auto i = 0u; i < iterations_per_thread; ++i) { |
| 467 | + ur_event_handle_t kernel_event; |
| 468 | + std::unique_lock<std::mutex> lock(last_event_mutex); |
| 469 | + ur_event_handle_t dep_event = shared_last_event; |
| 470 | + ASSERT_SUCCESS(urEnqueueKernelLaunchWithArgsExp( |
| 471 | + thread_queues[t], kernels[device_idx], 1, offset, work_dims, |
| 472 | + nullptr, static_cast<uint32_t>(kernel_args[device_idx].size()), |
| 473 | + kernel_args[device_idx].data(), nullptr, 1, &dep_event, |
| 474 | + &kernel_event)); |
| 475 | + shared_last_event = kernel_event; |
| 476 | + } |
| 477 | + }); |
| 478 | + } |
| 479 | + for (auto &th : threads) { |
| 480 | + th.join(); |
| 481 | + } |
| 482 | + |
| 483 | + // Enqueue read after all threads have finished submitting, waiting on the |
| 484 | + // last event in the global chain. |
| 485 | + ASSERT_SUCCESS(urEnqueueMemBufferRead(queues[0], buffer, false, 0, |
| 486 | + buffer_size_bytes, out_vec.data(), 1, |
| 487 | + &shared_last_event, &read_event)); |
| 488 | + |
| 489 | + ASSERT_SUCCESS(urEventWait(1, &read_event)); |
| 490 | + |
| 491 | + for (auto t = 0u; t < thread_count; ++t) { |
| 492 | + urQueueRelease(thread_queues[t]); |
| 493 | + } |
| 494 | + |
| 495 | + for (auto &a : out_vec) { |
| 496 | + ASSERT_EQ(a, fill_val + thread_count * iterations_per_thread); |
| 497 | + } |
| 498 | +} |
0 commit comments