66
77#include " test_ucp_memheap.h"
88#include < algorithm>
9+ #include < cstring> // for std::memcpy
910#include < memory>
1011#include < random>
12+ #include < string>
1113
1214extern " C" {
1315#include < ucp/core/ucp_ep.inl>
@@ -20,14 +22,16 @@ extern "C" {
2022class test_ucp_fault_tolerance : public test_ucp_memheap {
2123public:
2224 static void get_test_variants (std::vector<ucp_test_variant>& variants) {
23- add_variant_with_value (variants, UCP_FEATURE_RMA, 0 , " rma" );
25+ add_variant_with_value (variants, UCP_FEATURE_RMA|UCP_FEATURE_AM , 0 , " rma|am " );
2426 }
2527
2628 test_ucp_fault_tolerance () {
2729 configure_peer_failure_settings ();
2830 }
2931
3032protected:
33+ static constexpr uint16_t AM_ID = 0 ;
34+
3135 enum {
3236 GOOD_EP_INDEX = 0 , /* Index for good endpoint */
3337 INJECTED_EP_INDEX = 1 /* Index for failure-injected endpoint */
@@ -40,7 +44,8 @@ class test_ucp_fault_tolerance : public test_ucp_memheap {
4044
4145 enum test_op_t {
4246 TEST_OP_PUT,
43- TEST_OP_GET
47+ TEST_OP_GET,
48+ TEST_OP_FLUSH
4449 };
4550
4651 void init () override {
@@ -51,6 +56,41 @@ class test_ucp_fault_tolerance : public test_ucp_memheap {
5156 sender ().connect (&receiver (), ep_params, INJECTED_EP_INDEX);
5257 receiver ().connect (&sender (), ep_params, GOOD_EP_INDEX);
5358 receiver ().connect (&sender (), ep_params, INJECTED_EP_INDEX);
59+
60+ set_am_handler ();
61+ }
62+
63+ void set_am_handler () {
64+ ucp_am_handler_param_t param;
65+ param.field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID |
66+ UCP_AM_HANDLER_PARAM_FIELD_CB |
67+ UCP_AM_HANDLER_PARAM_FIELD_ARG;
68+ param.id = AM_ID;
69+ param.cb = am_recv_cb;
70+ param.arg = reinterpret_cast <void *>(this );
71+
72+ ucs_status_t status = ucp_worker_set_am_recv_handler (receiver ().worker (),
73+ ¶m);
74+ ASSERT_UCS_OK (status);
75+ }
76+
77+ static ucs_status_t am_recv_cb (void *arg, const void *header,
78+ size_t header_length, void *data,
79+ size_t length,
80+ const ucp_am_recv_param_t *param) {
81+ test_ucp_fault_tolerance *self =
82+ reinterpret_cast <test_ucp_fault_tolerance*>(arg);
83+
84+ if (param->recv_attr & UCP_AM_RECV_ATTR_FLAG_DATA) {
85+ self->m_am_rbuf .resize (length);
86+ std::memcpy (self->m_am_rbuf .data (), data, length);
87+ self->m_am_received = true ;
88+ }
89+
90+ EXPECT_FALSE (param->recv_attr & UCP_AM_RECV_ATTR_FLAG_RNDV) <<
91+ " RNDV is not covered yet" ;
92+
93+ return UCS_OK;
5494 }
5595
5696 /* *
@@ -85,24 +125,80 @@ class test_ucp_fault_tolerance : public test_ucp_memheap {
85125 ++self->m_err_count ;
86126 }
87127
128+ static void shuffle_lanes (std::vector<ucp_lane_index_t > &lanes, const std::string &lane_type) {
129+ if (lanes.size () < 2 ) {
130+ UCS_TEST_SKIP_R (" At least 2 " + lane_type + " s are required, but only " +
131+ std::to_string (lanes.size ()) + " " + lane_type + " s available" );
132+ }
133+
134+ /* Allocate randomizer on heap to avoid exceeding stack frame size limits. */
135+ std::unique_ptr<std::random_device> rnd_device (new std::random_device);
136+ std::unique_ptr<std::mt19937> rng (new std::mt19937 ((*rnd_device)()));
137+ std::shuffle (lanes.begin (), lanes.end (), *rng);
138+
139+ for (ucp_lane_index_t lane : lanes) {
140+ UCS_TEST_MESSAGE << lane_type << " : " << size_t (lane) << " /" << lanes.size ();
141+ }
142+ }
143+
144+ ucp_ep_h get_ucp_ep_for_err_injection (failure_side_t failure_side) {
145+ return (failure_side == FAILURE_SIDE_INITIATOR) ? sender ().ep (0 , INJECTED_EP_INDEX) :
146+ receiver ().ep (0 , INJECTED_EP_INDEX);
147+ }
148+
88149 /* *
89- * Check if we have at least 2 RMA lanes, skip test if not
150+ * Common helper function to test AM send 1KB with injected failure
90151 */
91- void skip_if_insufficient_rma_lanes (ucp_ep_h ep, ucp_lane_index_t failure_lane) {
92- ucp_lane_index_t num_lanes = ucp_ep_num_lanes (ep);
152+ void test_am_with_injected_failure (failure_side_t failure_side, bool flush_after = false ) {
153+ /* TODO: cover case when wireup is in progress, flush here is to complete wireup */
154+ flush_workers ();
93155
94- if (num_lanes <= failure_lane) {
95- UCS_TEST_SKIP_R (" Only " + std::to_string (int (num_lanes)) + " / " + std::to_string (int (failure_lane + 1 )) + " lanes available" );
96- } else {
97- UCS_TEST_MESSAGE << " Endpoint has " << int (num_lanes) << " lanes, failure lane is " << int (failure_lane);
156+ std::vector<ucp_lane_index_t > am_bw_lanes;
157+ const ucp_lane_index_t *am_bw_lane_idx;
158+ const ucp_lane_index_t *am_bw_lanes_key_p =
159+ ucp_ep_config (sender ().ep (0 , INJECTED_EP_INDEX))->key .am_bw_lanes ;
160+
161+ ucs_carray_for_each (am_bw_lane_idx, am_bw_lanes_key_p, UCP_MAX_LANES) {
162+ if (*am_bw_lane_idx != UCP_NULL_LANE) {
163+ am_bw_lanes.push_back (*am_bw_lane_idx);
164+ }
98165 }
166+
167+ shuffle_lanes (am_bw_lanes, " AM BW lane" );
168+
169+ UCS_TEST_MESSAGE << " Attempting AM send before failure injection..." ;
170+ ucs_status_t status = do_am_send_and_wait (sender ().ep (0 , INJECTED_EP_INDEX), am_msg_size (),
171+ flush_after);
172+ EXPECT_EQ (UCS_OK, status) << " AM send returned status: " << ucs_status_string (status);
173+
174+ ucp_ep_h ucp_ep_for_injection = get_ucp_ep_for_err_injection (failure_side);
175+ for (size_t lane_idx = 0 ; lane_idx < am_bw_lanes.size () - 1 ; ++lane_idx) {
176+ ucp_lane_index_t lane = am_bw_lanes[lane_idx];
177+ uct_ep_h uct_ep_for_injection = ucp_ep_get_lane (ucp_ep_for_injection, lane);
178+ status = uct_ep_invalidate (uct_ep_for_injection, 0 );
179+ if (status == UCS_ERR_UNSUPPORTED) {
180+ UCS_TEST_SKIP_R (" uct_ep_invalidate is not supported" );
181+ }
182+
183+ EXPECT_EQ (UCS_OK, status) << " uct_ep_invalidate returned status: "
184+ << ucs_status_string (status);
185+
186+ UCS_TEST_MESSAGE << " Attempting AM send after failure injection on lane "
187+ << size_t (lane) << ' /' << am_bw_lanes.size () << " ..." ;
188+ status = do_am_send_and_wait (sender ().ep (0 , INJECTED_EP_INDEX), am_msg_size (), flush_after);
189+ EXPECT_EQ (UCS_OK, status) << " AM send returned status: " << ucs_status_string (status);
190+ }
191+
192+ short_progress_loop ();
193+ ASSERT_EQ (0 , m_err_count) << " Error callback invoked " << m_err_count << " times" ;
194+ UCS_TEST_MESSAGE << " Success" ;
99195 }
100196
101197 /* *
102198 * Common helper function to test RMA operation with injected failure
103199 */
104200 void test_rma_with_injected_failure (failure_side_t failure_side, test_op_t op) {
105- const size_t size = 1 * UCS_GBYTE ;
201+ const size_t size = rma_msg_size () ;
106202 const char *op_name = (op == TEST_OP_PUT) ? " PUT" : " GET" ;
107203
108204 /* TODO: cover case when wireup is in progress, flush here is to complete wireup */
@@ -118,20 +214,7 @@ class test_ucp_fault_tolerance : public test_ucp_memheap {
118214 }
119215 }
120216
121- if (rma_bw_lanes.size () < 2 ) {
122- UCS_TEST_SKIP_R (" At least 2 RMA BW lanes are required, but only " +
123- std::to_string (rma_bw_lanes.size ()) + " available" );
124- }
125-
126- { // allocate randomizer on heap to avoid exceeding stack frame size limits
127- std::unique_ptr<std::random_device> rnd_device (new std::random_device);
128- std::unique_ptr<std::mt19937> rng (new std::mt19937 ((*rnd_device)()));
129- std::shuffle (rma_bw_lanes.begin (), rma_bw_lanes.end (), *rng);
130- }
131-
132- for (ucp_lane_index_t lane : rma_bw_lanes) {
133- UCS_TEST_MESSAGE << " RMA BW lane: " << size_t (lane) << " /" << rma_bw_lanes.size ();
134- }
217+ shuffle_lanes (rma_bw_lanes, " RMA BW lane" );
135218
136219 mem_buffer lbuf (size, UCS_MEMORY_TYPE_HOST);
137220 mapped_buffer rbuf (size, receiver ());
@@ -151,13 +234,11 @@ class test_ucp_fault_tolerance : public test_ucp_memheap {
151234 EXPECT_EQ (UCS_OK, status) << op_name << " operation returned status: "
152235 << ucs_status_string (status);
153236
154- ucp_ep_h injected_ucp_ep = (failure_side == FAILURE_SIDE_INITIATOR) ?
155- sender ().ep (0 , INJECTED_EP_INDEX) :
156- receiver ().ep (0 , INJECTED_EP_INDEX);
237+ ucp_ep_h ucp_ep_for_injection = get_ucp_ep_for_err_injection (failure_side);
157238 for (size_t lane_idx = 0 ; lane_idx < rma_bw_lanes.size () - 1 ; ++lane_idx) {
158239 ucp_lane_index_t lane = rma_bw_lanes[lane_idx];
159- uct_ep_h injected_uct_ep = ucp_ep_get_lane (injected_ucp_ep , lane);
160- status = uct_ep_invalidate (injected_uct_ep , 0 );
240+ uct_ep_h uct_ep_for_injection = ucp_ep_get_lane (ucp_ep_for_injection , lane);
241+ status = uct_ep_invalidate (uct_ep_for_injection , 0 );
161242 if (status == UCS_ERR_UNSUPPORTED) {
162243 UCS_TEST_SKIP_R (" uct_ep_invalidate is not supported" );
163244 }
@@ -179,6 +260,70 @@ class test_ucp_fault_tolerance : public test_ucp_memheap {
179260 }
180261
181262private:
263+ static size_t rma_msg_size () {
264+ return ucs::limit_buffer_size ((100 * UCS_MBYTE) /
265+ ucs::test_time_multiplier ());
266+ }
267+
268+ static size_t am_msg_size () {
269+ return ucs::limit_buffer_size (UCS_KBYTE);
270+ }
271+
272+ static std::string op_name (unsigned op_mask)
273+ {
274+ std::string name;
275+
276+ if (op_mask & TEST_OP_PUT) {
277+ name += " PUT|" ;
278+ }
279+
280+ if (op_mask & TEST_OP_GET) {
281+ name += " GET|" ;
282+ }
283+
284+ if (op_mask & TEST_OP_FLUSH) {
285+ name += " FLUSH|" ;
286+ }
287+
288+ if (!name.empty ()) {
289+ name.pop_back ();
290+ }
291+
292+ return name;
293+ }
294+
295+ ucs_status_t do_am_send_and_wait (ucp_ep_h ep, size_t size, bool flush_after) {
296+ m_am_received = false ;
297+
298+ mem_buffer sbuf (size, UCS_MEMORY_TYPE_HOST);
299+ sbuf.pattern_fill (m_seed, size);
300+
301+ ucp_request_param_t param;
302+ param.op_attr_mask = 0 ;
303+
304+ ucs_status_ptr_t sptr = ucp_am_send_nbx (ep, AM_ID, NULL , 0 , sbuf.ptr (),
305+ size, ¶m);
306+ // TODO: enable flush_after when PR #11210 is merged
307+ if (false && flush_after) {
308+ ucs_status_t status = request_wait (ucp_ep_flush_nbx (ep, ¶m));
309+ if (status != UCS_OK) {
310+ return status;
311+ }
312+ }
313+
314+ ucs_status_t status = request_wait (sptr);
315+ if (status != UCS_OK) {
316+ return status;
317+ }
318+
319+ while (!m_am_received) {
320+ short_progress_loop ();
321+ }
322+
323+ mem_buffer::pattern_check (m_am_rbuf.data (), size, m_seed);
324+ return UCS_OK;
325+ }
326+
182327 ucs_status_t do_put_and_wait (ucp_ep_h ep, mem_buffer &lbuf, mapped_buffer &rbuf,
183328 ucp_rkey_h rkey, size_t size) {
184329 ucp_request_param_t param;
@@ -224,9 +369,15 @@ class test_ucp_fault_tolerance : public test_ucp_memheap {
224369 }
225370 }
226371
227- size_t m_err_count = 0 ;
228- ucs_status_t m_err_status = UCS_OK;
372+ protected:
229373 static constexpr uint64_t m_seed = 0x12345678 ;
374+
375+ std::vector<uint8_t > m_am_rbuf = std::vector<uint8_t >(am_msg_size());
376+ volatile bool m_am_received = false ;
377+
378+ private:
379+ size_t m_err_count = 0 ;
380+ ucs_status_t m_err_status = UCS_OK;
230381};
231382
232383UCP_INSTANTIATE_TEST_CASE (test_ucp_fault_tolerance)
@@ -250,3 +401,18 @@ UCS_TEST_P(test_ucp_fault_tolerance, get_with_target_failure)
250401{
251402 test_rma_with_injected_failure (FAILURE_SIDE_TARGET, TEST_OP_GET);
252403}
404+
405+ UCS_TEST_P (test_ucp_fault_tolerance, am_send_with_initiator_failure, " MAX_EAGER_LANES=8" , " ZCOPY_THRESH=0" )
406+ {
407+ test_am_with_injected_failure (FAILURE_SIDE_INITIATOR);
408+ }
409+
410+ UCS_TEST_P (test_ucp_fault_tolerance, am_send_with_target_failure, " MAX_EAGER_LANES=8" , " ZCOPY_THRESH=0" )
411+ {
412+ test_am_with_injected_failure (FAILURE_SIDE_TARGET);
413+ }
414+
415+ UCS_TEST_P (test_ucp_fault_tolerance, am_send_flush_with_target_failure, " MAX_EAGER_LANES=8" , " ZCOPY_THRESH=0" )
416+ {
417+ test_am_with_injected_failure (FAILURE_SIDE_TARGET, true );
418+ }
0 commit comments