Skip to content

Commit 04a785a

Browse files
committed
Changed signed_log_test to not rely on barrier_sync
Using barrier_sync() in the main thread can "steal" the sync exchanges from a view-change's sync() and prevent either one from completing, which can happen if one node reaches the barrier_sync() while another one is still starting up and joining (causing a view change). It should work better to use a completely different socket and port for the final synchronization, like Cascade's perftest does.
1 parent ceeb679 commit 04a785a

3 files changed

Lines changed: 47 additions & 3 deletions

File tree

include/derecho/sst/detail/sst_impl.hpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ void SST<DerivedSST>::put(const std::vector<uint32_t> receiver_ranks, size_t off
155155

156156
template <typename DerivedSST>
157157
void SST<DerivedSST>::put_with_completion(const std::vector<uint32_t> receiver_ranks, size_t offset, size_t size) {
158+
dbg_trace(sst_logger, "put_with_completion called with arguments receiver_ranks={}, offset={}, size={}", receiver_ranks, offset, size);
158159
assert(offset + size <= rowLen);
159160
unsigned int num_writes_posted = 0;
160161
std::vector<bool> posted_write_to(num_members, false);
@@ -267,7 +268,10 @@ void SST<DerivedSST>::sync_with_members() const {
267268
std::tie(node_id, sst_index) = id_index;
268269
if(sst_index != my_index && !row_is_frozen[sst_index]) {
269270
dbg_debug(sst_logger, "TCP sync with node {}, for row {}", node_id, sst_index);
270-
sync(node_id);
271+
bool success = sync(node_id);
272+
if(!success) {
273+
dbg_warn(sst_logger, "TCP sync with node {} was unsuccessful", node_id);
274+
}
271275
}
272276
}
273277
}
@@ -283,7 +287,10 @@ void SST<DerivedSST>::sync_with_members(std::vector<uint32_t> row_indices) const
283287
}
284288
if(!row_is_frozen[row_index]) {
285289
dbg_debug(sst_logger, "TCP sync with node {}, for row {}", members[row_index], row_index);
286-
sync(members[row_index]);
290+
bool success = sync(members[row_index]);
291+
if(!success) {
292+
dbg_warn(sst_logger, "TCP sync with node {} was unsuccessful", members[row_index]);
293+
}
287294
}
288295
}
289296
}

src/applications/tests/unit_tests/signed_log_test.cpp

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,8 @@ std::unique_ptr<UnsignedObject> UnsignedObject::from_bytes(mutils::Deserializati
312312
return std::make_unique<UnsignedObject>(*field_ptr, *counter_ptr, test_state_ptr);
313313
}
314314

315+
const int TEST_COORDINATION_PORT = 16000;
316+
315317
/**
316318
* Command-line arguments: <one_field_size> <two_field_size> <unsigned_size> <num_updates> <update_size>
317319
* one_field_size: Maximum size of the subgroup that replicates the one-field signed object
@@ -466,6 +468,37 @@ int main(int argc, char** argv) {
466468
test_state.subgroup_finished_condition.wait(lock, [&]() { return test_state.subgroup_finished; });
467469
}
468470
std::cout << "Done" << std::endl;
469-
group.barrier_sync();
471+
// If this node is the leader, open a socket and wait for all the other nodes to contact it
472+
// Otherwise, open a socket to the leader and exchange IDs to signal that this node is finished
473+
if(group.get_my_rank() == 0) {
474+
tcp::connection_listener listener_socket(TEST_COORDINATION_PORT);
475+
std::set<derecho::node_id_t> nodes_contacted;
476+
nodes_contacted.emplace(group.get_my_id());
477+
std::vector<derecho::node_id_t> members_vector = group.get_members();
478+
std::set<derecho::node_id_t> all_member_ids(members_vector.begin(), members_vector.end());
479+
std::vector<tcp::socket> member_connections;
480+
std::cout << "Waiting for other nodes to signal they are finished (members = " << members_vector << ")" << std::endl;
481+
while(nodes_contacted != all_member_ids) {
482+
member_connections.emplace_back(listener_socket.accept());
483+
derecho::node_id_t finished_member_id;
484+
member_connections.back().read(finished_member_id);
485+
nodes_contacted.emplace(finished_member_id);
486+
std::cout << "Got a connection from node " << finished_member_id << std::endl;
487+
}
488+
std::cout << "All nodes are done with the test, acknowledging so they can exit" << std::endl;
489+
for(auto& connection : member_connections) {
490+
const int done_signal = 1;
491+
connection.write(done_signal);
492+
}
493+
//member_connections sockets will close automatically at the end of this scope
494+
} else {
495+
derecho::ip_addr_t leader_address = group.get_member_addresses().front().ip_address;
496+
std::cout << "Connecting to leader at " << leader_address << " to signal node " << group.get_my_id() << " is done" << std::endl;
497+
tcp::socket leader_connection(leader_address, TEST_COORDINATION_PORT);
498+
leader_connection.write(group.get_my_id());
499+
std::cout << "Waiting for leader to signal the test is done" << std::endl;
500+
int done;
501+
leader_connection.read(done);
502+
}
470503
group.leave(true);
471504
}

src/sst/lf.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,9 +936,13 @@ bool sync(uint32_t r_id) {
936936
int s = 0, t = 0;
937937
try {
938938
if(sst_connections->contains_node(r_id)) {
939+
dbg_trace(g_ctxt.sst_logger, "Sync: exchanging values with node {}", r_id);
939940
sst_connections->exchange(r_id, s, t);
941+
dbg_trace(g_ctxt.sst_logger, "Sync: exchange successful with node {}", r_id);
940942
} else if(external_client_connections->contains_node(r_id)) {
943+
dbg_trace(g_ctxt.sst_logger, "Sync: exchanging values with external client {}", r_id);
941944
external_client_connections->exchange(r_id, s, t);
945+
dbg_trace(g_ctxt.sst_logger, "Sync: exchange successful with external client {}", r_id);
942946
} else {
943947
return false;
944948
}

0 commit comments

Comments
 (0)