@@ -312,31 +312,35 @@ std::unique_ptr<UnsignedObject> UnsignedObject::from_bytes(mutils::Deserializati
312312 return std::make_unique<UnsignedObject>(*field_ptr, *counter_ptr, test_state_ptr);
313313}
314314
315+ const int TEST_COORDINATION_PORT = 16000 ;
316+
315317/* *
316- * Command-line arguments: <one_field_size> <two_field_size> <unsigned_size> <num_updates>
318+ * Command-line arguments: <one_field_size> <two_field_size> <unsigned_size> <num_updates> <update_size>
317319 * one_field_size: Maximum size of the subgroup that replicates the one-field signed object
318320 * two_field_size: Maximum size of the subgroup that replicates the two-field signed object
319321 * mixed_field_size: Maximum size of the subgroup that replicates the mixed-signed-and-unsigned-field object
320322 * unsigned_size: Maximum size of the subgroup that replicates the persistent-but-not-signed object
321- * num_updates: Number of randomly-generated 32-byte updates to send to each subgroup
323+ * num_updates: Number of randomly-generated updates to send to each subgroup
324+ * update_size: Size of the updates, in bytes
322325 */
323326int main (int argc, char ** argv) {
324327 pthread_setname_np (pthread_self (), " test_main" );
325328 const std::string characters (" abcdefghijklmnopqrstuvwxyz" );
326329 std::mt19937 random_generator (getpid ());
327330 std::uniform_int_distribution<std::size_t > char_distribution (0 , characters.size () - 1 );
328- const int num_args = 5 ;
331+ const int num_args = 6 ;
329332 if (argc < (num_args + 1 ) || (argc > (num_args + 1 ) && strcmp (" --" , argv[argc - (num_args + 1 )]) != 0 )) {
330333 std::cout << " Invalid command line arguments." << std::endl;
331- std::cout << " Usage: " << argv[0 ] << " [derecho-config-options -- ] one_field_size two_field_size mixed_field_size unsigned_size num_updates" << std::endl;
334+ std::cout << " Usage: " << argv[0 ] << " [derecho-config-options -- ] one_field_size two_field_size mixed_field_size unsigned_size num_updates update_size " << std::endl;
332335 return -1 ;
333336 }
334337
335338 const unsigned int subgroup_1_size = std::stoi (argv[argc - num_args]);
336339 const unsigned int subgroup_2_size = std::stoi (argv[argc - num_args + 1 ]);
337340 const unsigned int subgroup_mixed_size = std::stoi (argv[argc - num_args + 2 ]);
338341 const unsigned int subgroup_unsigned_size = std::stoi (argv[argc - num_args + 3 ]);
339- const unsigned int num_updates = std::stoi (argv[argc - 1 ]);
342+ const unsigned int num_updates = std::stoi (argv[argc - num_args + 4 ]);
343+ const unsigned int update_size = std::stoi (argv[argc - 1 ]);
340344 derecho::Conf::initialize (argc, argv);
341345
342346 derecho::SubgroupInfo subgroup_info (
@@ -401,7 +405,7 @@ int main(int argc, char** argv) {
401405 test_state.my_subgroup_is_unsigned = false ;
402406 // Send random updates
403407 for (unsigned counter = 0 ; counter < num_updates; ++counter) {
404- std::string new_string (' a' , 32 );
408+ std::string new_string (' a' , update_size );
405409 std::generate (new_string.begin (), new_string.end (),
406410 [&]() { return characters[char_distribution (random_generator)]; });
407411 object_handle.ordered_send <RPC_NAME (update_state)>(new_string);
@@ -414,8 +418,8 @@ int main(int argc, char** argv) {
414418 test_state.my_subgroup_is_unsigned = false ;
415419 // Send random updates
416420 for (unsigned counter = 0 ; counter < num_updates; ++counter) {
417- std::string new_foo (' a' , 32 );
418- std::string new_bar (' a' , 32 );
421+ std::string new_foo (' a' , update_size );
422+ std::string new_bar (' a' , update_size );
419423 std::generate (new_foo.begin (), new_foo.end (),
420424 [&]() { return characters[char_distribution (random_generator)]; });
421425 std::generate (new_bar.begin (), new_bar.end (),
@@ -430,7 +434,7 @@ int main(int argc, char** argv) {
430434 test_state.my_subgroup_is_unsigned = false ;
431435 // Send random updates, alternating between the signed, unsigned, and nonpersistent fields
432436 for (unsigned counter = 0 ; counter < num_updates; ++counter) {
433- std::string new_string_value (' a' , 32 );
437+ std::string new_string_value (' a' , update_size );
434438 std::generate (new_string_value.begin (), new_string_value.end (),
435439 [&]() { return characters[char_distribution (random_generator)]; });
436440 if (counter % 3 == 0 ) {
@@ -449,7 +453,7 @@ int main(int argc, char** argv) {
449453 test_state.my_subgroup_is_unsigned = true ;
450454 // Send random updates
451455 for (unsigned counter = 0 ; counter < num_updates; ++counter) {
452- std::string new_string (' a' , 32 );
456+ std::string new_string (' a' , update_size );
453457 std::generate (new_string.begin (), new_string.end (),
454458 [&]() { return characters[char_distribution (random_generator)]; });
455459 object_handle.ordered_send <RPC_NAME (update_state)>(new_string);
@@ -464,6 +468,37 @@ int main(int argc, char** argv) {
464468 test_state.subgroup_finished_condition .wait (lock, [&]() { return test_state.subgroup_finished ; });
465469 }
466470 std::cout << " Done" << std::endl;
467- group.barrier_sync ();
471+ // If this node is the leader, open a socket and wait for all the other nodes to contact it
472+ // Otherwise, open a socket to the leader and exchange IDs to signal that this node is finished
473+ if (group.get_my_rank () == 0 ) {
474+ tcp::connection_listener listener_socket (TEST_COORDINATION_PORT);
475+ std::set<derecho::node_id_t > nodes_contacted;
476+ nodes_contacted.emplace (group.get_my_id ());
477+ std::vector<derecho::node_id_t > members_vector = group.get_members ();
478+ std::set<derecho::node_id_t > all_member_ids (members_vector.begin (), members_vector.end ());
479+ std::vector<tcp::socket> member_connections;
480+ std::cout << " Waiting for other nodes to signal they are finished (members = " << members_vector << " )" << std::endl;
481+ while (nodes_contacted != all_member_ids) {
482+ member_connections.emplace_back (listener_socket.accept ());
483+ derecho::node_id_t finished_member_id;
484+ member_connections.back ().read (finished_member_id);
485+ nodes_contacted.emplace (finished_member_id);
486+ std::cout << " Got a connection from node " << finished_member_id << std::endl;
487+ }
488+ std::cout << " All nodes are done with the test, acknowledging so they can exit" << std::endl;
489+ for (auto & connection : member_connections) {
490+ const int done_signal = 1 ;
491+ connection.write (done_signal);
492+ }
493+ // member_connections sockets will close automatically at the end of this scope
494+ } else {
495+ derecho::ip_addr_t leader_address = group.get_member_addresses ().front ().ip_address ;
496+ std::cout << " Connecting to leader at " << leader_address << " to signal node " << group.get_my_id () << " is done" << std::endl;
497+ tcp::socket leader_connection (leader_address, TEST_COORDINATION_PORT);
498+ leader_connection.write (group.get_my_id ());
499+ std::cout << " Waiting for leader to signal the test is done" << std::endl;
500+ int done;
501+ leader_connection.read (done);
502+ }
468503 group.leave (true );
469504}
0 commit comments