-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathinstruction_graph_misc_tests.cc
More file actions
730 lines (600 loc) · 39.4 KB
/
instruction_graph_misc_tests.cc
File metadata and controls
730 lines (600 loc) · 39.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
#include <catch2/catch_template_test_macros.hpp>
#include <catch2/catch_test_macros.hpp>
#include <catch2/generators/catch_generators_range.hpp>
#include <catch2/matchers/catch_matchers.hpp>
#include "instruction_graph_test_utils.h"
#include "test_utils.h"
using namespace celerity;
using namespace celerity::detail;
using namespace celerity::experimental;
namespace acc = celerity::access;
TEST_CASE("a command group without data access compiles to a trivial graph", "[instruction_graph_generator][instruction-graph]") {
test_utils::idag_test_context ictx(1 /* nodes */, 0 /* my nid */, 1 /* devices */);
const range<1> test_range = {256};
ictx.device_compute(test_range).name("kernel").submit();
ictx.finish();
const auto all_instrs = ictx.query_instructions();
CHECK(all_instrs.count() == 3);
CHECK(all_instrs.count<epoch_instruction_record>() == 2);
CHECK(all_instrs.count<device_kernel_instruction_record>() == 1);
const auto kernel = all_instrs.select_unique<device_kernel_instruction_record>("kernel");
CHECK(kernel->access_map.empty());
CHECK(kernel->execution_range == box(subrange<3>(zeros, range_cast<3>(test_range))));
CHECK(kernel->device_id == device_id(0));
CHECK(kernel.predecessors().is_unique<epoch_instruction_record>());
CHECK(kernel.successors().is_unique<epoch_instruction_record>());
}
TEST_CASE("side-effects introduce dependencies between host-task instructions", "[instruction_graph_generator][instruction-graph]") {
test_utils::idag_test_context ictx(1 /* nodes */, 0 /* my nid */, 1 /* devices */);
ictx.set_horizon_step(999);
auto ho1 = ictx.create_host_object();
auto ho2 = ictx.create_host_object(false /* owns instance */);
ictx.master_node_host_task().name("affect ho1 (a)").affect(ho1).submit();
ictx.master_node_host_task().name("affect ho2 (a)").affect(ho2).submit();
ictx.master_node_host_task().name("affect ho1 (b)").affect(ho1).submit();
ictx.master_node_host_task().name("affect ho2 (b)").affect(ho2).submit();
ictx.master_node_host_task().name("affect ho1 + ho2").affect(ho1).affect(ho2).submit();
ictx.master_node_host_task().name("affect ho1 (c)").affect(ho1).submit();
ictx.master_node_host_task().name("affect ho2 (c)").affect(ho2).submit();
ictx.finish(); // destroys all live host objects
const auto all_instrs = ictx.query_instructions();
const auto affect_ho1_a = all_instrs.select_unique<host_task_instruction_record>("affect ho1 (a)");
const auto affect_ho2_a = all_instrs.select_unique<host_task_instruction_record>("affect ho2 (a)");
const auto affect_ho1_b = all_instrs.select_unique<host_task_instruction_record>("affect ho1 (b)");
const auto affect_ho2_b = all_instrs.select_unique<host_task_instruction_record>("affect ho2 (b)");
const auto affect_both = all_instrs.select_unique<host_task_instruction_record>("affect ho1 + ho2");
const auto affect_ho1_c = all_instrs.select_unique<host_task_instruction_record>("affect ho1 (c)");
const auto affect_ho2_c = all_instrs.select_unique<host_task_instruction_record>("affect ho2 (c)");
// only ho1 owns its instance, so only one destroy_host_object_instruction is emitted
const auto destroy_ho1 = all_instrs.select_unique<destroy_host_object_instruction_record>();
CHECK(destroy_ho1->host_object_id == ho1.get_id());
CHECK(affect_ho1_a.predecessors().is_unique<epoch_instruction_record>());
CHECK(affect_ho2_a.predecessors().is_unique<epoch_instruction_record>());
CHECK(affect_ho1_a.successors() == affect_ho1_b);
CHECK(affect_ho2_a.successors() == affect_ho2_b);
CHECK(affect_ho1_b.successors() == affect_both);
CHECK(affect_ho2_b.successors() == affect_both);
CHECK(affect_both.successors() == union_of(affect_ho1_c, affect_ho2_c));
CHECK(affect_ho1_c.successors() == destroy_ho1);
CHECK(destroy_ho1.successors().is_unique<epoch_instruction_record>());
CHECK(affect_ho2_c.successors().is_unique<epoch_instruction_record>());
}
TEST_CASE("collective-group instructions follow a single global total order", "[instruction_graph_generator][instruction-graph]") {
const auto local_nid = GENERATE(values<node_id>({0, 1}));
test_utils::idag_test_context ictx(2 /* nodes */, local_nid, 1 /* devices */);
ictx.set_horizon_step(999);
// collective-groups are not explicitly registered with graph generators, so both IDAG tests and the runtime use the same mechanism to declare them
experimental::collective_group custom_cg_1;
experimental::collective_group custom_cg_2;
ictx.collective_host_task().name("default-group (a)").submit();
ictx.collective_host_task().name("default-group (b)").submit();
ictx.collective_host_task(custom_cg_1).name("custom-group 1 (a)").submit();
ictx.collective_host_task(custom_cg_2).name("custom-group 2").submit();
ictx.collective_host_task(custom_cg_1).name("custom-group 1 (b)").submit();
ictx.collective_host_task().name("default-group (c)").submit();
ictx.finish();
const auto all_instrs = ictx.query_instructions();
const auto init_epoch = all_instrs.select_unique(ictx.get_initial_epoch_task());
// the default collective group does not use the default communicator (aka MPI_COMM_WORLD) because host tasks are executed on a different thread
const auto clone_for_default_group = init_epoch.successors().assert_unique<clone_collective_group_instruction_record>();
CHECK(clone_for_default_group->original_collective_group_id == root_collective_group_id);
CHECK(clone_for_default_group->new_collective_group_id != clone_for_default_group->original_collective_group_id);
const auto default_cgid = clone_for_default_group->new_collective_group_id;
const auto default_group_a = all_instrs.select_unique<host_task_instruction_record>("default-group (a)");
CHECK(default_group_a->collective_group_id == default_cgid);
CHECK(default_group_a.predecessors() == clone_for_default_group);
const auto default_group_b = all_instrs.select_unique<host_task_instruction_record>("default-group (b)");
CHECK(default_group_b->collective_group_id == default_cgid);
CHECK(default_group_b.predecessors() == default_group_a); // collective-group ordering
// even though "default-group (c)" is submitted last, it only depends on its predecessor in the same group
const auto default_group_c = all_instrs.select_unique<host_task_instruction_record>("default-group (c)");
CHECK(default_group_c->collective_group_id == default_cgid);
CHECK(default_group_c.predecessors() == default_group_b); // collective-group ordering
CHECK(default_group_c.successors().is_unique<epoch_instruction_record>());
// clone-collective-group instructions are ordered, because cloning an MPI communicator is a collective operation as well
const auto clone_for_custom_group_1 = clone_for_default_group.successors().select_unique<clone_collective_group_instruction_record>();
CHECK(clone_for_custom_group_1->original_collective_group_id == root_collective_group_id);
CHECK(clone_for_custom_group_1->new_collective_group_id != clone_for_custom_group_1->original_collective_group_id);
const auto custom_cgid_1 = clone_for_custom_group_1->new_collective_group_id;
CHECK(custom_cgid_1 != default_cgid);
const auto custom_group_1_a = all_instrs.select_unique<host_task_instruction_record>("custom-group 1 (a)");
CHECK(custom_group_1_a->collective_group_id == custom_cgid_1);
CHECK(custom_group_1_a.predecessors() == clone_for_custom_group_1);
const auto custom_group_1_b = all_instrs.select_unique<host_task_instruction_record>("custom-group 1 (b)");
CHECK(custom_group_1_b->collective_group_id == custom_cgid_1);
CHECK(custom_group_1_b.predecessors() == custom_group_1_a); // collective-group ordering
CHECK(custom_group_1_b.successors().is_unique<epoch_instruction_record>());
// clone-collective-group instructions are ordered, because cloning an MPI communicator is a collective operation as well
const auto clone_for_custom_group_2 = clone_for_custom_group_1.successors().select_unique<clone_collective_group_instruction_record>();
CHECK(clone_for_custom_group_2->original_collective_group_id == root_collective_group_id);
CHECK(clone_for_custom_group_2->new_collective_group_id != clone_for_custom_group_2->original_collective_group_id);
const auto custom_cgid_2 = clone_for_custom_group_2->new_collective_group_id;
CHECK(custom_cgid_2 != default_cgid);
CHECK(custom_cgid_2 != custom_cgid_1);
const auto custom_group_2 = all_instrs.select_unique<host_task_instruction_record>("custom-group 2");
CHECK(custom_group_2->collective_group_id == custom_cgid_2);
CHECK(custom_group_2.predecessors() == clone_for_custom_group_2);
CHECK(custom_group_2.successors().is_unique<epoch_instruction_record>());
}
TEMPLATE_TEST_CASE_SIG("buffer fences export data to user memory", "[instruction_graph_generator][instruction-graph][fence]", ((int Dims), Dims), 0, 1, 2, 3) {
constexpr static auto full_range = test_utils::truncate_range<Dims>({256, 256, 256});
const auto export_subrange = GENERATE(values({subrange<Dims>({}, full_range), test_utils::truncate_subrange<Dims>({{48, 64, 72}, {128, 128, 128}})}));
test_utils::idag_test_context ictx(1 /* nodes */, 0 /* my nid */, 1 /* devices */);
auto buf = ictx.create_buffer<int>(full_range);
ictx.device_compute(full_range).name("writer").discard_write(buf, acc::one_to_one()).submit();
ictx.fence(buf, export_subrange);
ictx.finish();
const auto all_instrs = ictx.query_instructions();
const auto fence = all_instrs.select_unique<fence_instruction_record>();
const auto& fence_buffer_info = std::get<fence_instruction_record::buffer_variant>(fence->variant);
CHECK(fence_buffer_info.bid == buf.get_id());
CHECK(fence_buffer_info.box.get_offset() == id_cast<3>(export_subrange.offset));
CHECK(fence_buffer_info.box.get_range() == range_cast<3>(export_subrange.range));
// copy to user memory
const auto fence_copy = fence.predecessors().assert_unique<copy_instruction_record>();
CHECK(fence_copy->buffer_id == buf.get_id());
CHECK(fence_copy->source_allocation_id.get_memory_id() == host_memory_id);
CHECK(fence_copy->dest_allocation_id.get_memory_id() == user_memory_id);
CHECK(fence_copy->dest_layout == region_layout(strided_layout(box_cast<3>(box(export_subrange)))));
CHECK(fence_copy->copy_region == region_cast<3>(region(box(export_subrange))));
CHECK(fence_copy->element_size == sizeof(int));
// exports are done from host memory, so data needs to be staged from device memory
const auto staging_copy = fence_copy.predecessors().assert_unique<copy_instruction_record>();
CHECK(staging_copy->dest_allocation_id == fence_copy->source_allocation_id);
CHECK(staging_copy->copy_region == fence_copy->copy_region);
CHECK(staging_copy->buffer_id == buf.get_id());
}
TEST_CASE("instruction_graph_generator gracefully handles empty-range buffer fences", "[instruction_graph_generator][instruction-graph][fence]") {
const auto local_nid = GENERATE(values<node_id>({0, 1}));
test_utils::idag_test_context ictx(2 /* nodes */, local_nid, 1 /* devices */);
auto buf = ictx.create_buffer<int>(range(256));
ictx.device_compute(buf.get_range()).discard_write(buf, acc::one_to_one()).submit();
ictx.fence(buf, subrange<1>());
ictx.finish();
const auto all_instrs = ictx.query_instructions();
const auto init_epoch = all_instrs.select_unique(ictx.get_initial_epoch_task());
const auto fence = all_instrs.select_unique<fence_instruction_record>();
const auto shutdown_epoch = all_instrs.select_unique<epoch_instruction_record>(
[](const epoch_instruction_record& einstr) { return einstr.epoch_action == epoch_action::shutdown; });
CHECK(fence.predecessors() == init_epoch);
CHECK(fence.successors() == shutdown_epoch);
const auto& garbage = shutdown_epoch->garbage;
CHECK(garbage.user_allocations.size() == 1);
CHECK(garbage.user_allocations.at(0) != null_allocation_id);
CHECK(garbage.user_allocations.at(0).get_memory_id() == user_memory_id);
CHECK(all_instrs.count<send_instruction_record>() == 0);
CHECK(all_instrs.count<receive_instruction_record>() == 0);
CHECK(all_instrs.count<split_receive_instruction_record>() == 0);
CHECK(all_instrs.count<await_receive_instruction_record>() == 0);
CHECK(all_instrs.count<copy_instruction_record>() == 0);
}
TEST_CASE("horizons and epochs notify the executor of unreferenced user allocations after buffer fences",
"[instruction_graph_generator][instruction-graph][fence]") //
{
const auto trigger = GENERATE(values<std::string>({"horizon", "epoch"}));
CAPTURE(trigger);
test_utils::idag_test_context ictx(1 /* nodes */, 0 /* my nid */, 1 /* devices */);
ictx.set_horizon_step(trigger == "horizon" ? 2 : 999);
auto buf = ictx.create_buffer<int>(range(256));
ictx.host_task(buf.get_range()).name("writer").discard_write(buf, acc::one_to_one()).submit();
ictx.fence(buf, subrange({}, buf.get_range()));
if(trigger == "horizon") { ictx.host_task(buf.get_range()).name("horizon trigger").read_write(buf, acc::one_to_one()).submit(); }
ictx.finish();
const auto all_instrs = ictx.query_instructions();
instruction_garbage garbage;
if(trigger == "horizon") {
const auto horizon = all_instrs.select_unique<horizon_instruction_record>();
garbage = horizon->garbage;
} else {
const auto epoch = all_instrs.select_unique<epoch_instruction_record>(
[](const epoch_instruction_record& einstr) { return einstr.epoch_action == epoch_action::shutdown; });
garbage = epoch->garbage;
}
CHECK(garbage.user_allocations.size() == 1);
CHECK(garbage.user_allocations.at(0) != null_allocation_id);
CHECK(garbage.user_allocations.at(0).get_memory_id() == user_memory_id);
}
TEST_CASE("epochs notify the executor of unreferenced user allocations after a buffer is destroyed", "[instruction_graph_generator][instruction-graph]") {
const bool host_initialized = GENERATE(values<int>({false, true}));
CAPTURE(host_initialized);
test_utils::idag_test_context ictx(1 /* nodes */, 0 /* my nid */, 1 /* devices */);
(void)ictx.create_buffer<int>(range(256), host_initialized);
ictx.finish(); // destroys all live buffers
const auto all_instrs = ictx.query_instructions();
const auto shutdown_epoch = all_instrs.select_unique<epoch_instruction_record>(
[](const epoch_instruction_record& einstr) { return einstr.epoch_action == epoch_action::shutdown; });
const auto garbage = shutdown_epoch->garbage;
if(host_initialized) {
CHECK(garbage.user_allocations.size() == 1);
CHECK(garbage.user_allocations.at(0) != null_allocation_id);
CHECK(garbage.user_allocations.at(0).get_memory_id() == user_memory_id);
} else {
CHECK(garbage.user_allocations.empty());
}
}
TEST_CASE("host-object fences introduce the appropriate dependencies", "[instruction_graph_generator][instruction-graph][fence]") {
test_utils::idag_test_context ictx(1 /* nodes */, 0 /* my nid */, 1 /* devices */);
auto ho = ictx.create_host_object();
ictx.master_node_host_task().name("task 1").affect(ho).submit();
ictx.fence(ho);
ictx.master_node_host_task().name("task 2").affect(ho).submit();
ictx.finish();
const auto all_instrs = ictx.query_instructions();
const auto task_1 = all_instrs.select_unique<host_task_instruction_record>("task 1");
const auto fence = all_instrs.select_unique<fence_instruction_record>();
const auto task_2 = all_instrs.select_unique<host_task_instruction_record>("task 2");
CHECK(task_1.successors() == fence);
CHECK(fence.successors() == task_2);
const auto& ho_fence_info = std::get<fence_instruction_record::host_object_variant>(fence->variant);
CHECK(ho_fence_info.hoid == ho.get_id());
}
TEST_CASE("epochs serialize execution and compact dependency tracking", "[instruction_graph_generator][instruction-graph][compaction]") {
enum test_mode_enum { baseline_without_barrier_epoch, test_with_barrier_epoch };
const auto test_mode = GENERATE(values({baseline_without_barrier_epoch, test_with_barrier_epoch}));
INFO((test_mode == baseline_without_barrier_epoch ? "baseline: no epoch is inserted" : "test: barrier epoch is inserted"));
test_utils::idag_test_context ictx(1 /* nodes */, 0 /* my nid */, 1 /* devices */);
ictx.set_horizon_step(999);
auto buf = ictx.create_buffer(range(256));
auto ho = ictx.create_host_object();
// we initialize the buffer on the device to get a single source allocation for the d2h copy after the barrier
ictx.device_compute(range(1)).name("producer").discard_write(buf, acc::all()).submit();
// there are two concurrent writers to `buf` on D0, which would generate two concurrent d2h copies if there were no epoch before the read
ictx.device_compute(range(1)).name("producer").discard_write(buf, acc::fixed<1>({0, 128})).submit();
ictx.device_compute(range(1)).name("producer").discard_write(buf, acc::fixed<1>({128, 256})).submit();
ictx.master_node_host_task().name("producer").affect(ho).submit();
std::optional<task_id> barrier_epoch_tid;
if(test_mode == test_with_barrier_epoch) { barrier_epoch_tid = ictx.epoch(epoch_action::barrier); }
ictx.master_node_host_task().name("consumer").read(buf, acc::all()).affect(ho).submit();
ictx.finish();
// this test is very explicit about each instruction in the graph - things might easily break when touching IDAG generation.
const auto all_instrs = ictx.query_instructions();
// we expect an init epoch + optional barrier epoch + shutdown epoch
CHECK(all_instrs.count<epoch_instruction_record>() == (test_mode == baseline_without_barrier_epoch ? 2 : 3));
if(test_mode == baseline_without_barrier_epoch) {
// Rudimentary check that without an epoch, the IDAG splits the copy to enable concurrency.
// For a more thorough test of this, see "local copies are split on writers to facilitate compute-copy overlap" in instruction_graph_memory_tests.
CHECK(all_instrs.count<copy_instruction_record>() == 2);
return;
}
const auto init_epoch = all_instrs.select_unique<epoch_instruction_record>(ictx.get_initial_epoch_task());
CHECK(init_epoch.predecessors().count() == 0);
const auto all_device_allocs = all_instrs.select_all<alloc_instruction_record>(
[](const alloc_instruction_record& ainstr) { return ainstr.allocation_id.get_memory_id() != host_memory_id; });
CHECK(all_device_allocs.predecessors() == init_epoch);
const auto all_producers = all_instrs.select_all("producer");
// the barrier epoch, aka queue::wait(experimental::barrier) or distr_queue::slow_full_sync(), will transitively depend on all previous instructions
const auto barrier_epoch = all_instrs.select_unique<epoch_instruction_record>(barrier_epoch_tid.value());
CHECK(barrier_epoch->epoch_action == epoch_action::barrier);
CHECK(barrier_epoch.transitive_predecessors() == union_of(init_epoch, all_device_allocs, all_producers));
// There will only be a single d2h copy, since inserting the epoch will compact the last-writer tracking structures, replacing the two concurrent device
// kernels with the single epoch.
const auto host_alloc = all_instrs.select_unique<alloc_instruction_record>(
[](const alloc_instruction_record& ainstr) { return ainstr.allocation_id.get_memory_id() == host_memory_id; });
CHECK(host_alloc.predecessors() == barrier_epoch);
const auto d2h_copy = host_alloc.successors().select_unique<copy_instruction_record>();
const auto consumer = all_instrs.select_unique<host_task_instruction_record>("consumer");
const auto all_frees = all_instrs.select_all<free_instruction_record>();
const auto destroy_ho = all_instrs.select_unique<destroy_host_object_instruction_record>();
const auto shutdown_epoch = consumer.transitive_successors().select_unique<epoch_instruction_record>();
CHECK(shutdown_epoch->epoch_action == epoch_action::shutdown);
CHECK(shutdown_epoch.successors().count() == 0);
// all instructions generated after the barrier epoch must transitively depend on it
CHECK(barrier_epoch.transitive_successors() == union_of(host_alloc, d2h_copy, consumer, all_frees, destroy_ho, shutdown_epoch));
// there can be no dependencies from instructions after the epoch
CHECK(union_of(all_device_allocs, all_producers, barrier_epoch) == union_of(init_epoch, all_device_allocs, all_producers).successors());
// there can be no dependencies to instructions before the epoch
CHECK(union_of(barrier_epoch, host_alloc, d2h_copy, consumer, all_frees, destroy_ho)
.contains(union_of(host_alloc, d2h_copy, consumer, all_frees, destroy_ho, shutdown_epoch).predecessors()));
}
TEST_CASE("horizon application serializes execution and compacts dependency tracking", "[instruction_graph_generator][instruction-graph][compaction]") {
enum test_mode_enum { baseline_without_horizons = 0, baseline_with_unapplied_horizon = 1, test_with_applied_horizon = 2 };
const auto test_mode = GENERATE(values({baseline_without_horizons, baseline_with_unapplied_horizon, test_with_applied_horizon}));
INFO((test_mode == baseline_without_horizons ? "baseline: no horizons are inserted"
: test_mode == baseline_with_unapplied_horizon ? "baseline: horizon is inserted, but not applied"
: "test: applying horizon"));
const auto expected_num_horizons = static_cast<int>(test_mode); // we have defined test_mode_enum accordingly
CAPTURE(expected_num_horizons);
const int horizon_step = 3; // so no producer below triggers a horizon
test_utils::idag_test_context ictx(1 /* nodes */, 0 /* my nid */, 1 /* devices */);
ictx.set_horizon_step(test_mode == baseline_without_horizons ? 999 : horizon_step);
auto buf = ictx.create_buffer(range(256));
auto test_ho = ictx.create_host_object();
auto age_ho = ictx.create_host_object(); // we repeatedly affect this host object to trigger horizon generation
ictx.master_node_host_task().name("producer").affect(test_ho).submit();
ictx.device_compute(range(1)).name("producer").discard_write(buf, acc::all()).submit();
ictx.device_compute(range(1)).name("producer").discard_write(buf, acc::fixed<1>({0, 128})).submit();
ictx.device_compute(range(1)).name("producer").discard_write(buf, acc::fixed<1>({128, 128})).submit();
for(int i = 0; i < expected_num_horizons * horizon_step; ++i) {
ictx.master_node_host_task().name("age").affect(age_ho).submit();
}
ictx.master_node_host_task().name("consumer").read(buf, acc::all()).affect(test_ho).affect(age_ho).submit();
ictx.finish();
// this test is very explicit about each instruction in the graph - things might easily break when touching IDAG generation.
const auto all_instrs = ictx.query_instructions();
const auto all_horizons = all_instrs.select_all<horizon_instruction_record>();
REQUIRE(all_horizons.count() == static_cast<size_t>(expected_num_horizons));
if(test_mode != test_with_applied_horizon) {
// Rudimentary check that without applying a horizon, the IDAG splits the copy to enable concurrency.
// For a more thorough test of this, see "local copies are split on writers to facilitate compute-copy overlap" in instruction_graph_memory_tests.
CHECK(all_instrs.count<copy_instruction_record>() == 2);
return;
}
// instruction records are in the order they were generated
const auto applied_horizon = all_horizons[0];
const auto current_horizon = all_horizons[1];
const auto all_device_allocs = all_instrs.select_all<alloc_instruction_record>(
[](const alloc_instruction_record& ainstr) { return ainstr.allocation_id.get_memory_id() != host_memory_id; });
const auto all_producers = all_instrs.select_all("producer");
CHECK(applied_horizon.transitive_predecessors().contains(union_of(all_device_allocs, all_producers)));
// There will only be a single d2h copy, since inserting the epoch will compact the last-writer tracking structures, replacing the two concurrent device
// kernels with the single epoch.
const auto host_alloc = all_instrs.select_unique<alloc_instruction_record>(
[](const alloc_instruction_record& ainstr) { return ainstr.allocation_id.get_memory_id() == host_memory_id; });
const auto d2h_copy = host_alloc.successors().select_unique<copy_instruction_record>();
const auto consumer = all_instrs.select_unique<host_task_instruction_record>("consumer");
const auto all_frees = all_instrs.select_all<free_instruction_record>();
const auto all_destroy_hos = all_instrs.select_all<destroy_host_object_instruction_record>();
CHECK(applied_horizon.transitive_successors().contains(union_of(host_alloc, d2h_copy, consumer, all_frees, all_destroy_hos, current_horizon)));
// The current horizon has been generated through the dependency chain on `age_ho`, and before submission of the consumer.
CHECK_FALSE(union_of(host_alloc, d2h_copy, consumer, all_frees, all_destroy_hos, current_horizon).transitive_successors().contains(current_horizon));
// The current horizon has not been applied, so no instructions except the shutdown epoch depend on it.
CHECK(current_horizon.successors().is_unique<epoch_instruction_record>());
}
TEST_CASE("instruction_graph_generator throws in tests if it detects an uninitialized read", "[instruction_graph_generator]") {
const size_t num_devices = 2;
const range<1> device_range{num_devices};
test_utils::idag_test_context::policy_set policy;
policy.tm.uninitialized_read_error = error_policy::ignore; // otherwise we get task-level errors first
policy.cggen.uninitialized_read_error = error_policy::ignore; // otherwise we get command-level errors first
test_utils::idag_test_context ictx(1, 0, num_devices, true /* supports d2d copies */, policy);
SECTION("from a read-accessor on a fully uninitialized buffer") {
auto buf = ictx.create_buffer<1>({1});
CHECK_THROWS_WITH((ictx.device_compute(device_range).read(buf, acc::all()).submit()),
"Instructions for device kernel T1 are trying to read B0 {[0,0,0] - [1,1,1]}, which is neither found locally nor has been await-pushed before.");
}
SECTION("from a read-accessor on a partially, locally initialized buffer") {
auto buf = ictx.create_buffer<1>(device_range);
ictx.device_compute(range(1)).discard_write(buf, acc::one_to_one()).submit();
CHECK_THROWS_WITH((ictx.device_compute(device_range).read(buf, acc::all()).submit()),
"Instructions for device kernel T2 are trying to read B0 {[1,0,0] - [2,1,1]}, which is neither found locally nor has been await-pushed before.");
}
SECTION("from a read-accessor on a partially, remotely initialized buffer") {
auto buf = ictx.create_buffer<1>(device_range);
ictx.device_compute(range(1)).discard_write(buf, acc::one_to_one()).submit();
CHECK_THROWS_WITH((ictx.device_compute(device_range).read(buf, acc::one_to_one()).submit()),
"Instructions for device kernel T2 are trying to read B0 {[1,0,0] - [2,1,1]}, which is neither found locally nor has been await-pushed before.");
}
SECTION("from a reduction including the current value of an uninitialized buffer") {
auto buf = ictx.create_buffer<1>({1});
CHECK_THROWS_WITH((ictx.device_compute(device_range).reduce(buf, true /* include current buffer value */).submit()),
"Instructions for device kernel T1 are trying to read B0 {[0,0,0] - [1,1,1]}, which is neither found locally nor has been await-pushed before.");
}
}
TEST_CASE("instruction_graph_generator throws in tests if it detects overlapping writes", "[instruction_graph_generator]") {
const size_t num_devices = 2;
test_utils::idag_test_context ictx(1, 0, num_devices);
auto buf = ictx.create_buffer<2>({20, 20});
SECTION("on all-write") {
CHECK_THROWS_WITH((ictx.device_compute(buf.get_range()).discard_write(buf, acc::all()).submit()),
"Device kernel T1 has overlapping writes on N0 in B0 {[0,0,0] - [20,20,1]}. Choose a non-overlapping range mapper for this write access or "
"constrain the split via experimental::constrain_split to make the access non-overlapping.");
}
SECTION("on neighborhood-write") {
CHECK_THROWS_WITH((ictx.device_compute(buf.get_range()).discard_write(buf, acc::neighborhood({1, 1})).submit()),
"Device kernel T1 has overlapping writes on N0 in B0 {[9,0,0] - [11,20,1]}. Choose a non-overlapping range mapper for this write access or "
"constrain the split via experimental::constrain_split to make the access non-overlapping.");
}
}
// If there is no local last writer nor a pending await push for a buffer region being read, instruction_graph_generator should treat the memory being read from
// as coherent and the alloc_instruction (or the horizon / epoch that subsumes it) as the last writer for establishing dependencies.
TEST_CASE("instruction_graph_generator gracefully handles uninitialized reads when check is disabled", "[instruction_graph_generator]") {
test_utils::idag_test_context::policy_set policy;
policy.tm.uninitialized_read_error = error_policy::ignore;
policy.cggen.uninitialized_read_error = error_policy::ignore;
policy.iggen.uninitialized_read_error = error_policy::ignore;
test_utils::idag_test_context ictx(1 /* num nodes */, 0 /* local nid */, 1 /* num devices */, true /* supports d2d copies */, policy);
auto buf = ictx.create_buffer<1>({1});
SECTION("from a device-kernel read-accessor") { //
ictx.device_compute(range(1)).read(buf, acc::all()).submit();
ictx.finish();
const auto all_instrs = ictx.query_instructions();
const auto kernel = all_instrs.select_unique<device_kernel_instruction_record>();
const auto alloc = all_instrs.select_unique<alloc_instruction_record>();
const auto free = all_instrs.select_unique<free_instruction_record>();
CHECK(kernel.predecessors().contains(alloc));
CHECK(kernel.successors().contains(free));
}
SECTION("from a reduction including the current buffer value") {
ictx.device_compute(range(1)).reduce(buf, true /* include current buffer value */).submit();
ictx.finish();
const auto all_instrs = ictx.query_instructions();
const auto kernel = all_instrs.select_unique<device_kernel_instruction_record>();
const auto local_reduce = all_instrs.select_unique<reduce_instruction_record>();
const auto alloc_device = all_instrs.select_unique<alloc_instruction_record>([](const alloc_instruction_record& ainstr) {
return ainstr.origin == alloc_instruction_record::alloc_origin::buffer && ainstr.allocation_id.get_memory_id() != host_memory_id;
});
const auto alloc_host = all_instrs.select_unique<alloc_instruction_record>([](const alloc_instruction_record& ainstr) {
return ainstr.origin == alloc_instruction_record::alloc_origin::buffer && ainstr.allocation_id.get_memory_id() == host_memory_id;
});
const auto free_device = all_instrs.select_unique<free_instruction_record>(
[&](const free_instruction_record& finstr) { return finstr.allocation_id == alloc_device->allocation_id; });
const auto free_host = all_instrs.select_unique<free_instruction_record>(
[&](const free_instruction_record& finstr) { return finstr.allocation_id == alloc_host->allocation_id; });
CHECK(kernel.transitive_predecessors().contains(alloc_device));
CHECK(kernel.transitive_successors_across<copy_instruction_record>().contains(free_device));
CHECK(local_reduce.transitive_predecessors().contains(alloc_host));
CHECK(local_reduce.successors().contains(free_host));
}
SECTION("from a host-task read-accessor") { //
ictx.host_task(range(1)).read(buf, acc::all()).submit();
ictx.finish();
const auto all_instrs = ictx.query_instructions();
const auto host_task = all_instrs.select_unique<host_task_instruction_record>();
const auto alloc = all_instrs.select_unique<alloc_instruction_record>();
const auto free = all_instrs.select_unique<free_instruction_record>();
CHECK(host_task.predecessors().contains(alloc));
CHECK(host_task.successors().contains(free));
}
}
TEST_CASE("instruction_graph_generator gracefully handles overlapping writes when check is disabled", "[instruction_graph_generator]") {
size_t num_devices = 1;
size_t oversubscription = 1;
SECTION("between chunks of one kernel split among multiple devices") {
num_devices = 2;
oversubscription = 1;
}
SECTION("between an oversubscribed kernel on a single device") {
num_devices = 1;
oversubscription = 2;
}
test_utils::idag_test_context::policy_set policy;
policy.cggen.overlapping_write_error = error_policy::ignore;
policy.iggen.overlapping_write_error = error_policy::ignore;
test_utils::idag_test_context ictx(1 /* num nodes */, 0 /* local nid */, num_devices, true /* supports d2d copies */, policy);
auto buf = ictx.create_buffer<1>({1});
ictx.device_compute(range(2)).name("writer").hint(hints::oversubscribe(oversubscription)).discard_write(buf, acc::all()).submit();
ictx.device_compute(range(1)).name("reader").read(buf, acc::all()).submit();
ictx.finish();
const auto all_instrs = ictx.query_instructions();
const auto all_allocs = all_instrs.select_all<alloc_instruction_record>();
const auto all_writers = all_instrs.select_all<device_kernel_instruction_record>("writer");
const auto reader = all_instrs.select_unique<device_kernel_instruction_record>("reader");
const auto all_frees = all_instrs.select_all<free_instruction_record>();
const auto free_after_reader =
all_frees.select_unique([&](const free_instruction_record& finstr) { return finstr.allocation_id == reader->access_map.at(0).allocation_id; });
CHECK(all_allocs.count() == num_devices);
CHECK(all_writers.count() == num_devices * oversubscription);
CHECK(all_allocs.successors().contains(all_writers));
CHECK(reader.transitive_predecessors().contains(all_writers));
CHECK(reader.successors() == free_after_reader);
CHECK(all_frees.count() == all_allocs.count());
for(const auto& writer : all_writers.iterate()) {
const auto free_after_writer =
all_frees.select_unique([&](const free_instruction_record& finstr) { return finstr.allocation_id == writer->access_map.at(0).allocation_id; });
CHECK(writer.transitive_successors().contains(free_after_writer));
}
}
TEMPLATE_TEST_CASE_SIG("hints::split_2d results in a 2d split", "[instruction_graph_generator][instruction-graph]", ((int Dims), Dims), 2, 3) {
const size_t num_nodes = 1;
const node_id local_nid = 0;
const size_t num_devices = 4;
test_utils::idag_test_context ictx(num_nodes, local_nid, num_devices);
const auto range = test_utils::truncate_range<Dims>({256, 256, 256});
auto buf = ictx.create_buffer(range);
ictx.device_compute(range).hint(hints::split_2d()).discard_write(buf, acc::one_to_one()).submit();
ictx.finish();
const auto all_instrs = ictx.query_instructions();
const auto all_kernels = all_instrs.select_all<device_kernel_instruction_record>();
CHECK(all_kernels.count() == num_devices);
box_vector<3> kernel_boxes;
for(const auto& kernel : all_kernels.iterate()) {
// 2D split means that no tile spans either dimension 0 or 1 entirely
CHECK(kernel->execution_range.get_range()[0] < range[0]);
CHECK(kernel->execution_range.get_range()[1] < range[1]);
REQUIRE(kernel->access_map.size() == 1);
const auto& write = kernel->access_map.front();
CHECK(write.accessed_region_in_buffer == box(kernel->execution_range));
CHECK(write.accessed_bounding_box_in_buffer == box(kernel->execution_range));
kernel_boxes.push_back(box(kernel->execution_range));
}
CHECK(region(std::move(kernel_boxes)) == box(subrange(id<3>(), range_cast<3>(range))));
}
TEMPLATE_TEST_CASE_SIG("oversubscription splits local chunks recursively", "[instruction_graph_generator][instruction-graph]", ((int Dims), Dims), 1, 2, 3) {
const size_t num_nodes = 1;
const node_id local_nid = 0;
const size_t num_devices = 2;
const size_t oversub_factor = 4;
test_utils::idag_test_context ictx(num_nodes, local_nid, num_devices);
auto buf = ictx.create_buffer(test_utils::truncate_range<Dims>({256, 256, 256}));
// This code is duck-typing identical for {device_kernel and host_task}_instruction_record, so we use a generic lambda to DRY it
const auto check_is_box_tiling = [&](const auto& all_device_kernels_or_host_tasks) {
region_builder<3> kernel_boxes;
for(const auto& kernel : all_device_kernels_or_host_tasks.iterate()) {
const auto kernel_box = detail::box(kernel->execution_range);
REQUIRE(kernel->access_map.size() == 1);
const auto& write = kernel->access_map.front();
CHECK(write.accessed_region_in_buffer == kernel_box);
CHECK(write.accessed_bounding_box_in_buffer == kernel_box);
kernel_boxes.add(kernel_box);
}
const auto kernel_region = std::move(kernel_boxes).into_region();
CHECK(kernel_region == box(subrange(id<3>(), range_cast<3>(buf.get_range()))));
};
SECTION("for device kernels") {
SECTION("with a 1d split") {
ictx.device_compute(buf.get_range())
.hint(hints::split_1d())
.hint(hints::oversubscribe(oversub_factor))
.discard_write(buf, acc::one_to_one())
.submit();
}
SECTION("with a 2d split") {
ictx.device_compute(buf.get_range())
.hint(hints::split_2d())
.hint(hints::oversubscribe(oversub_factor))
.discard_write(buf, acc::one_to_one())
.submit();
}
const auto all_instrs = ictx.query_instructions();
const auto all_kernels = all_instrs.select_all<device_kernel_instruction_record>();
CHECK(all_kernels.count() == num_devices * oversub_factor);
CHECK(all_kernels.count(device_id(0)) == oversub_factor);
CHECK(all_kernels.count(device_id(1)) == oversub_factor);
check_is_box_tiling(all_kernels);
}
SECTION("for host tasks kernels") {
SECTION("with a 1d split") {
ictx.host_task(buf.get_range()).hint(hints::split_1d()).hint(hints::oversubscribe(oversub_factor)).discard_write(buf, acc::one_to_one()).submit();
}
SECTION("with a 2d split") {
ictx.host_task(buf.get_range()).hint(hints::split_2d()).hint(hints::oversubscribe(oversub_factor)).discard_write(buf, acc::one_to_one()).submit();
}
const auto all_instrs = ictx.query_instructions();
const auto all_host_tasks = all_instrs.select_all<host_task_instruction_record>();
CHECK(all_host_tasks.count() == oversub_factor);
check_is_box_tiling(all_host_tasks);
}
}
TEST_CASE("instruction_graph_generator throws in tests when detecting unsafe oversubscription", "[instruction_graph_generator]") {
test_utils::idag_test_context ictx(1 /* num nodes */, 0 /* local nid */, 1 /* num devices */);
auto buf = ictx.create_buffer(range(1));
auto ho = ictx.create_host_object();
SECTION("in device-kernels with reductions") {
CHECK_THROWS_WITH(ictx.device_compute(range(256)).hint(hints::oversubscribe(2)).reduce(buf, false /* include_current_buffer_value */).submit(),
"Refusing to oversubscribe device kernel T1 because it performs a reduction.");
}
SECTION("in master-node host tasks") {
CHECK_THROWS_WITH(ictx.master_node_host_task().hint(hints::oversubscribe(2)).submit(),
"Refusing to oversubscribe master-node host task T1 because its iteration space cannot be split.");
}
SECTION("in collective tasks") {
CHECK_THROWS_WITH(ictx.collective_host_task().hint(hints::oversubscribe(2)).submit(),
"Refusing to oversubscribe collective host task T1 because it participates in a collective group.");
}
SECTION("in host tasks with side effects") {
CHECK_THROWS_WITH(ictx.host_task(range(256)).hint(hints::oversubscribe(2)).affect(ho).submit(),
"Refusing to oversubscribe host-compute task T1 because it has side effects.");
}
}
TEST_CASE("instruction_graph_generator gracefully handles unsafe oversubscription when check is disabled", "[instruction_graph_generator]") {
test_utils::idag_test_context::policy_set policy;
policy.iggen.unsafe_oversubscription_error = error_policy::ignore;
test_utils::idag_test_context ictx(1 /* num nodes */, 0 /* local nid */, 1 /* num devices */, true /* supports d2d copies */, policy);
auto buf = ictx.create_buffer(range(1));
auto ho = ictx.create_host_object();
SECTION("in device-kernels with reductions") {
ictx.device_compute(range(256)).hint(hints::oversubscribe(2)).reduce(buf, false /* include_current_buffer_value */).submit();
}
SECTION("on master-node host tasks") { //
ictx.master_node_host_task().hint(hints::oversubscribe(2)).submit();
}
SECTION("on collective tasks") { //
ictx.collective_host_task().hint(hints::oversubscribe(2)).submit();
}
SECTION("on host tasks with side effects") { //
ictx.host_task(range(256)).hint(hints::oversubscribe(2)).affect(ho).submit();
}
ictx.finish();
const auto all_instrs = ictx.query_instructions();
CHECK(all_instrs.count<device_kernel_instruction_record>() + all_instrs.count<host_task_instruction_record>() == 1);
}