From 954aa43e281f3bc3e2384a269699a78421744f0b Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 17 Jul 2025 11:10:55 -0400 Subject: [PATCH 1/7] Added some debug logging statements to try to find a bug Sometimes I get a "Warning: Signature did not match my signature!" message while running CascadeChain, even though everything is working fine. I think the problem is a race condition between one node updating its SST and another node reading the SST in handle_verify_request. The SST to_string method needs to be updated to display the signature- related fields so I can debug this more easily, and also needed to be fixed anyway to display the fields in the order they are declared. --- include/derecho/core/detail/derecho_sst.hpp | 2 +- src/core/derecho_sst.cpp | 65 ++++++++++++++------- src/core/git_version.cpp | 4 +- src/core/persistence_manager.cpp | 6 ++ src/core/view_manager.cpp | 3 + 5 files changed, 56 insertions(+), 24 deletions(-) diff --git a/include/derecho/core/detail/derecho_sst.hpp b/include/derecho/core/detail/derecho_sst.hpp index a578e645..aa82bb1c 100644 --- a/include/derecho/core/detail/derecho_sst.hpp +++ b/include/derecho/core/detail/derecho_sst.hpp @@ -289,7 +289,7 @@ class DerechoSST : public sst::SST { void push_row_except_slots(); /** - * Creates a string representation of the local row (not the whole table). + * Creates a string representation of the table for debugging purposes. * This should be converted to an ostream operator<< to follow standards. */ std::string to_string() const; diff --git a/src/core/derecho_sst.cpp b/src/core/derecho_sst.cpp index 6f0170bb..a0db80fc 100644 --- a/src/core/derecho_sst.cpp +++ b/src/core/derecho_sst.cpp @@ -93,21 +93,40 @@ std::string DerechoSST::to_string() const { for(uint row = 0; row < num_rows; ++row) { s << "row=" << row << " "; s << "vid=" << vid[row] << " "; - s << "suspected={ "; + s << "seq_num={ "; + for(unsigned int n = 0; n < seq_num.size(); n++) { + s << seq_num[row][n] << " "; + } + s << "}" + << ", delivered_num={ "; + for(unsigned int n = 0; n < delivered_num.size(); n++) { + s << delivered_num[row][n] << " "; + } + s << "}" + << ", persisted_num={ "; + for(unsigned int n = 0; n < persisted_num.size(); n++) { + s << persisted_num[row][n] << " "; + } + s << "}" + << ", signed_num={ "; + for(unsigned int n = 0; n < signed_num.size(); n++) { + s << signed_num[row][n] << " "; + } + s << "}" + << ", verified_num={ "; + for(unsigned int n = 0; n < verified_num.size(); n++) { + s << verified_num[row][n] << " "; + } + s << "}" + << ", suspected={ "; for(unsigned int n = 0; n < suspected.size(); n++) { s << (suspected[row][n] ? "T" : "F") << " "; } - - s << "}, num_changes=" << num_changes[row] << ", num_committed=" - << num_committed[row] << ", num_installed=" << num_installed[row]; - s << ", changes={ "; + s << "}" + << ", changes={ "; for(int n = 0; n < (num_changes[row] - num_installed[row]); ++n) { s << "(" << changes[row][n].change_id << "," << changes[row][n].leader_id << ") "; } - s << "}, num_acked= " << num_acked[row] << ", num_received={ "; - for(unsigned int n = 0; n < num_received.size(); n++) { - s << num_received[row][n] << " "; - } s << "}, joiner_ips={ "; for(int n = 0; n < (num_changes[row] - num_installed[row]); ++n) { s << joiner_ips[row][n] << " "; @@ -132,27 +151,31 @@ std::string DerechoSST::to_string() const { for(int n = 0; n < (num_changes[row] - num_installed[row]); ++n) { s << joiner_external_ports[row][n] << " "; } - s << "}, seq_num={ "; - for(unsigned int n = 0; n < seq_num.size(); n++) { - s << seq_num[row][n] << " "; - } - s << "}" - << ", delivered_num={ "; - for(unsigned int n = 0; n < delivered_num.size(); n++) { - s << delivered_num[row][n] << " "; + s << "}, num_changes=" << num_changes[row] + << ", num_committed=" << num_committed[row] + << ", num_acked=" << num_acked[row] + << ", num_installed=" << num_installed[row]; + s << ", num_received={ "; + for(unsigned int n = 0; n < num_received.size(); n++) { + s << num_received[row][n] << " "; } s << "}" - << ", wedged = " << (wedged[row] ? "T" : "F") << ", global_min = { "; + << ", wedged = " << (wedged[row] ? "T" : "F") + << ", global_min = { "; for(unsigned int n = 0; n < global_min.size(); n++) { s << global_min[row][n] << " "; } - s << "}, global_min_ready= { "; for(uint n = 0; n < global_min_ready.size(); n++) { - s << global_min_ready[row] << " "; + s << (global_min_ready[row] ? "T" : "F") << " "; + } + s << "}" + << "local_stability_frontier={"; + for(unsigned int n = 0; n < local_stability_frontier.size(); n++) { + s << local_stability_frontier[row][n] << " "; } s << "}" - << ", rip = " << rip[row] << std::endl; + << ", rip = " << (rip[row] ? "T" : "F") << std::endl; } return s.str(); } diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index 9b865044..6b78b21d 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 4; const int PATCH_VERSION = 1; -const int COMMITS_AHEAD_OF_VERSION = 4; +const int COMMITS_AHEAD_OF_VERSION = 6; const char* VERSION_STRING = "2.4.1"; -const char* VERSION_STRING_PLUS_COMMITS = "2.4.1+4"; +const char* VERSION_STRING_PLUS_COMMITS = "2.4.1+6"; } diff --git a/src/core/persistence_manager.cpp b/src/core/persistence_manager.cpp index 95b3bbdc..bf322acd 100644 --- a/src/core/persistence_manager.cpp +++ b/src/core/persistence_manager.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -175,8 +176,10 @@ void PersistenceManager::handle_persist_request(subgroup_id_t subgroup_id, persi // Only update the signature and signed_num in SST if signed_num has in fact advanced if(object_has_signature && Vc.gmsSST->signed_num[Vc.gmsSST->get_local_index()][subgroup_id] < signed_version) { + dbg_trace(persistence_logger, "PersistenceManager: Copying signature for version {} into SST: {:n}", signed_version, spdlog::to_hex(&signature[0], &signature[signature_size])); gmssst::set(&(Vc.gmsSST->signatures[Vc.gmsSST->get_local_index()][subgroup_id * signature_size]), signature, signature_size); + dbg_trace(persistence_logger, "PersistenceManager: Updating subgroup {} signed_num from {} to {}", subgroup_id, Vc.gmsSST->signed_num[Vc.gmsSST->get_local_index()][subgroup_id], signed_version); gmssst::set(Vc.gmsSST->signed_num[Vc.gmsSST->get_local_index()][subgroup_id], signed_version); Vc.gmsSST->put(Vc.multicast_group->get_shard_sst_indices(subgroup_id), (uint8_t*)(&Vc.gmsSST->signatures[0][subgroup_id * signature_size]) - Vc.gmsSST->getBaseAddress(), @@ -230,6 +233,7 @@ void PersistenceManager::handle_verify_request(subgroup_id_t subgroup_id, persis } // The signature in the other node's "signatures" column corresponds to the version in its "signed_num" column const persistent::version_t other_signed_version = Vc.gmsSST->signed_num[shard_member_rank][subgroup_id]; + dbg_debug(persistence_logger, "PersistenceManager: Node {} has latest signed version {}, checking that version (verify request was for version {})", Vc.members[shard_member_rank], other_signed_version, version); // If this node hasn't finished signing that version yet, we won't be able to check that the other node's signature // matches the local signature. It also means the minimum signed version can't advance past my_signed_version anyway. if(other_signed_version > my_signed_version) { @@ -252,6 +256,8 @@ void PersistenceManager::handle_verify_request(subgroup_id_t subgroup_id, persis minimum_verified_version = std::min(minimum_verified_version, other_signed_version); } else { dbg_warn(persistence_logger, "Signature for version {} from node {} did not match my signature!", other_signed_version, Vc.members[shard_member_rank]); + dbg_debug(persistence_logger, "my_signature = {:n}", spdlog::to_hex(my_signature)); + dbg_debug(persistence_logger, "other_signature = {:n}", spdlog::to_hex(other_signature)); } } //Update verified_num to the lowest version number that successfully verified across all shard members diff --git a/src/core/view_manager.cpp b/src/core/view_manager.cpp index 26832d94..7d5116a7 100644 --- a/src/core/view_manager.cpp +++ b/src/core/view_manager.cpp @@ -520,6 +520,7 @@ void ViewManager::finish_setup() { curr_view->gmsSST->push_row_except_slots(); dbg_debug(vm_logger, "Joining node initialized its SST row from the leader"); } + dbg_trace(vm_logger, "Initial SST state: {}", curr_view->gmsSST->to_string()); // Handle any external-client requests that were waiting for the group to start for(auto& external_socket : startup_pending_external_sockets) { @@ -1595,6 +1596,7 @@ void ViewManager::finish_view_change(DerechoSST& gmsSST) { gmsSST.predicates.remove(leader_suspicion_handle); gmsSST.predicates.remove(follower_suspicion_handle); + dbg_trace(vm_logger, "Old SST state at end of view {}: {}", curr_view->vid, gmsSST.to_string()); dbg_debug(vm_logger, "Starting creation of new SST and DerechoGroup for view {}", next_view->vid); for(const node_id_t failed_node_id : next_view->departed) { dbg_debug(vm_logger, "Removing global TCP connections for failed node {} from RDMC and SST", failed_node_id); @@ -1637,6 +1639,7 @@ void ViewManager::finish_view_change(DerechoSST& gmsSST) { // This will block until everyone responds to SST/RDMC initial handshakes transition_multicast_group(next_subgroup_settings, new_num_received_size, new_slot_size, new_index_field_size); dbg_debug(vm_logger, "Done setting up SST and MulticastGroup for view {}; about to do a sync_with_members()", next_view->vid); + dbg_trace(vm_logger, "My row in new SST initialized to: {}", next_view->gmsSST->to_string()); // New members can now proceed to view_manager.finish_setup(), which will call put() and sync() next_view->gmsSST->push_row_except_slots(); From 7955b1296817b42c01fa54d99a0f2b5b0e0b01c8 Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 17 Jul 2025 13:02:29 -0400 Subject: [PATCH 2/7] Minor fix in to_string, added a size parameter to the test The SST to_string should unpack joiner IP addresses with inet_ntoa, since the point of this string is to be human-readable for debugging. The signature race condition doesn't seem to show up in signed_log_test, just in CascadeChain, so maybe the problem is signed_log_test uses updates that are so small they complete quickly. --- .../tests/unit_tests/signed_log_test.cpp | 22 ++++++++++--------- src/core/derecho_sst.cpp | 5 +++-- src/core/git_version.cpp | 4 ++-- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/src/applications/tests/unit_tests/signed_log_test.cpp b/src/applications/tests/unit_tests/signed_log_test.cpp index 853bea45..444fd405 100644 --- a/src/applications/tests/unit_tests/signed_log_test.cpp +++ b/src/applications/tests/unit_tests/signed_log_test.cpp @@ -313,22 +313,23 @@ std::unique_ptr UnsignedObject::from_bytes(mutils::Deserializati } /** - * Command-line arguments: + * Command-line arguments: * one_field_size: Maximum size of the subgroup that replicates the one-field signed object * two_field_size: Maximum size of the subgroup that replicates the two-field signed object * mixed_field_size: Maximum size of the subgroup that replicates the mixed-signed-and-unsigned-field object * unsigned_size: Maximum size of the subgroup that replicates the persistent-but-not-signed object - * num_updates: Number of randomly-generated 32-byte updates to send to each subgroup + * num_updates: Number of randomly-generated updates to send to each subgroup + * update_size: Size of the updates, in bytes */ int main(int argc, char** argv) { pthread_setname_np(pthread_self(), "test_main"); const std::string characters("abcdefghijklmnopqrstuvwxyz"); std::mt19937 random_generator(getpid()); std::uniform_int_distribution char_distribution(0, characters.size() - 1); - const int num_args = 5; + const int num_args = 6; if(argc < (num_args + 1) || (argc > (num_args + 1) && strcmp("--", argv[argc - (num_args + 1)]) != 0)) { std::cout << "Invalid command line arguments." << std::endl; - std::cout << "Usage: " << argv[0] << " [derecho-config-options -- ] one_field_size two_field_size mixed_field_size unsigned_size num_updates" << std::endl; + std::cout << "Usage: " << argv[0] << " [derecho-config-options -- ] one_field_size two_field_size mixed_field_size unsigned_size num_updates update_size" << std::endl; return -1; } @@ -336,7 +337,8 @@ int main(int argc, char** argv) { const unsigned int subgroup_2_size = std::stoi(argv[argc - num_args + 1]); const unsigned int subgroup_mixed_size = std::stoi(argv[argc - num_args + 2]); const unsigned int subgroup_unsigned_size = std::stoi(argv[argc - num_args + 3]); - const unsigned int num_updates = std::stoi(argv[argc - 1]); + const unsigned int num_updates = std::stoi(argv[argc - num_args + 4]); + const unsigned int update_size = std::stoi(argv[argc - 1]); derecho::Conf::initialize(argc, argv); derecho::SubgroupInfo subgroup_info( @@ -401,7 +403,7 @@ int main(int argc, char** argv) { test_state.my_subgroup_is_unsigned = false; //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { - std::string new_string('a', 32); + std::string new_string('a', update_size); std::generate(new_string.begin(), new_string.end(), [&]() { return characters[char_distribution(random_generator)]; }); object_handle.ordered_send(new_string); @@ -414,8 +416,8 @@ int main(int argc, char** argv) { test_state.my_subgroup_is_unsigned = false; //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { - std::string new_foo('a', 32); - std::string new_bar('a', 32); + std::string new_foo('a', update_size); + std::string new_bar('a', update_size); std::generate(new_foo.begin(), new_foo.end(), [&]() { return characters[char_distribution(random_generator)]; }); std::generate(new_bar.begin(), new_bar.end(), @@ -430,7 +432,7 @@ int main(int argc, char** argv) { test_state.my_subgroup_is_unsigned = false; //Send random updates, alternating between the signed, unsigned, and nonpersistent fields for(unsigned counter = 0; counter < num_updates; ++counter) { - std::string new_string_value('a', 32); + std::string new_string_value('a', update_size); std::generate(new_string_value.begin(), new_string_value.end(), [&]() { return characters[char_distribution(random_generator)]; }); if(counter % 3 == 0) { @@ -449,7 +451,7 @@ int main(int argc, char** argv) { test_state.my_subgroup_is_unsigned = true; //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { - std::string new_string('a', 32); + std::string new_string('a', update_size); std::generate(new_string.begin(), new_string.end(), [&]() { return characters[char_distribution(random_generator)]; }); object_handle.ordered_send(new_string); diff --git a/src/core/derecho_sst.cpp b/src/core/derecho_sst.cpp index a0db80fc..ba505da0 100644 --- a/src/core/derecho_sst.cpp +++ b/src/core/derecho_sst.cpp @@ -1,5 +1,6 @@ #include +#include #include #include @@ -129,7 +130,7 @@ std::string DerechoSST::to_string() const { } s << "}, joiner_ips={ "; for(int n = 0; n < (num_changes[row] - num_installed[row]); ++n) { - s << joiner_ips[row][n] << " "; + s << inet_ntoa(in_addr{joiner_ips[row][n]}) << " "; } s << "}, joiner_gms_ports={ "; for(int n = 0; n < (num_changes[row] - num_installed[row]); ++n) { @@ -170,7 +171,7 @@ std::string DerechoSST::to_string() const { s << (global_min_ready[row] ? "T" : "F") << " "; } s << "}" - << "local_stability_frontier={"; + << ", local_stability_frontier={"; for(unsigned int n = 0; n < local_stability_frontier.size(); n++) { s << local_stability_frontier[row][n] << " "; } diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index 6b78b21d..983ffbe5 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 4; const int PATCH_VERSION = 1; -const int COMMITS_AHEAD_OF_VERSION = 6; +const int COMMITS_AHEAD_OF_VERSION = 7; const char* VERSION_STRING = "2.4.1"; -const char* VERSION_STRING_PLUS_COMMITS = "2.4.1+6"; +const char* VERSION_STRING_PLUS_COMMITS = "2.4.1+7"; } From 3e648703a7f7ef6fb4a057e9127992f5913d2fee Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 17 Jul 2025 16:03:50 -0400 Subject: [PATCH 3/7] Fixed initialization bug with signed_num I discovered the source of at least one type of "Signature did not match" warning: The signed_num field in DerechoSST was initialized to 0, not -1, even though 0 is a valid version (the first version). This meant handle_verify_request would try to verify another node's signature on version 0 even if it hadn't yet put a signature in the SST signature field. In fact, handle_verify_request needs a special case to check if a peer node has not yet computed any signatures; it won't gracefully degrade because 0 is a real version number and -1 is invalid as an argument to subgroup_object->get_signature(). --- src/core/git_version.cpp | 4 ++-- src/core/multicast_group.cpp | 1 + src/core/persistence_manager.cpp | 5 +++++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index 983ffbe5..911dd775 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 4; const int PATCH_VERSION = 1; -const int COMMITS_AHEAD_OF_VERSION = 7; +const int COMMITS_AHEAD_OF_VERSION = 8; const char* VERSION_STRING = "2.4.1"; -const char* VERSION_STRING_PLUS_COMMITS = "2.4.1+7"; +const char* VERSION_STRING_PLUS_COMMITS = "2.4.1+8"; } diff --git a/src/core/multicast_group.cpp b/src/core/multicast_group.cpp index 4081fa7f..04e9b848 100644 --- a/src/core/multicast_group.cpp +++ b/src/core/multicast_group.cpp @@ -484,6 +484,7 @@ void MulticastGroup::initialize_sst_row() { sst->seq_num[member_index][j] = -1; sst->delivered_num[member_index][j] = -1; sst->persisted_num[member_index][j] = -1; + sst->signed_num[member_index][j] = -1; sst->verified_num[member_index][j] = -1; } memset(const_cast(sst->signatures[member_index]), 0, sst->signatures.size()); diff --git a/src/core/persistence_manager.cpp b/src/core/persistence_manager.cpp index bf322acd..993d0caa 100644 --- a/src/core/persistence_manager.cpp +++ b/src/core/persistence_manager.cpp @@ -240,6 +240,11 @@ void PersistenceManager::handle_verify_request(subgroup_id_t subgroup_id, persis dbg_debug(persistence_logger, "PersistenceManager: Skipping signature check on version {} from node {} because this node hasn't signed that version yet", other_signed_version, Vc.members[shard_member_rank]); continue; } + // If the other node hasn't signed anything yet, its signed_num will still be at the initial value of -1, which is not a valid version + if(other_signed_version == -1) { + dbg_debug(persistence_logger, "PersistenceManager: Skipping signature check for node {} because it has not signed any versions yet", Vc.members[shard_member_rank]); + continue; + } //Copy out the signature so it can't change during verification std::vector other_signature(signature_size); gmssst::set(other_signature.data(), From 94094ddd79bd92b0db50c2b28ea78bcce9da3ccd Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Thu, 17 Jul 2025 17:27:17 -0400 Subject: [PATCH 4/7] Added debug logging to SST to find another bug View changes keep getting stuck while trying to run signed_log_test. --- include/derecho/sst/detail/sst_impl.hpp | 4 ++++ include/derecho/sst/sst.hpp | 4 ++++ src/core/git_version.cpp | 4 ++-- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/include/derecho/sst/detail/sst_impl.hpp b/include/derecho/sst/detail/sst_impl.hpp index 26fa1307..049ebbbe 100644 --- a/include/derecho/sst/detail/sst_impl.hpp +++ b/include/derecho/sst/detail/sst_impl.hpp @@ -177,6 +177,7 @@ void SST::put_with_completion(const std::vector receiver_r // perform a remote RDMA write on the owner of the row ce_ctxt[index].set_remote_id(res_vec[index]->remote_id); ce_ctxt[index].set_ce_idx(ce_idx); + dbg_trace(sst_logger, "Created a CE context for write to row {}: {:p} -> {{ {}, {}, {} }}", index, static_cast(&ce_ctxt[index]), ce_ctxt[index].ce_idx(), ce_ctxt[index].remote_id(), ce_ctxt[index].is_managed()); res_vec[index]->post_remote_write_with_completion(&ce_ctxt[index], offset, size); posted_write_to[index] = true; num_writes_posted++; @@ -216,6 +217,7 @@ void SST::put_with_completion(const std::vector receiver_r if (result && result.value() == 1) { polled_successfully_from[index] = true; } else { + dbg_debug(sst_logger, "put_with_completion marked row {} failed due to not receiving a completion", index); failed_node_indexes.push_back(index); } } @@ -264,6 +266,7 @@ void SST::sync_with_members() const { for(auto const& id_index : members_by_id) { std::tie(node_id, sst_index) = id_index; if(sst_index != my_index && !row_is_frozen[sst_index]) { + dbg_debug(sst_logger, "TCP sync with node {}, for row {}", node_id, sst_index); sync(node_id); } } @@ -279,6 +282,7 @@ void SST::sync_with_members(std::vector row_indices) const continue; } if(!row_is_frozen[row_index]) { + dbg_debug(sst_logger, "TCP sync with node {}, for row {}", members[row_index], row_index); sync(members[row_index]); } } diff --git a/include/derecho/sst/sst.hpp b/include/derecho/sst/sst.hpp index a9287da9..91f639ba 100644 --- a/include/derecho/sst/sst.hpp +++ b/include/derecho/sst/sst.hpp @@ -181,6 +181,9 @@ class SST { DerivedSST* derived_this; + /** Pointer to the logger for the SST module, which is created in sst::lf_initialize() */ + std::shared_ptr sst_logger; + std::vector background_threads; std::atomic thread_shutdown; @@ -236,6 +239,7 @@ class SST { public: SST(DerivedSST* derived_class_pointer, const SSTParams& params) : derived_this(derived_class_pointer), + sst_logger(spdlog::get(LoggerFactory::SST_LOGGER_NAME)), thread_shutdown(false), poll_cq_timeout_ms(derecho::getConfUInt32(derecho::Conf::DERECHO_SST_POLL_CQ_TIMEOUT_MS)), members(params.members), diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index 911dd775..0c6ae7e1 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 4; const int PATCH_VERSION = 1; -const int COMMITS_AHEAD_OF_VERSION = 8; +const int COMMITS_AHEAD_OF_VERSION = 9; const char* VERSION_STRING = "2.4.1"; -const char* VERSION_STRING_PLUS_COMMITS = "2.4.1+8"; +const char* VERSION_STRING_PLUS_COMMITS = "2.4.1+9"; } From b7e401f2b570d5996e59d486fdfe6e189e3455de Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Fri, 18 Jul 2025 14:19:02 -0400 Subject: [PATCH 5/7] Partially handled race conditions when reading the signature Since the "signature" SST field is not actually monotonic (a later signature won't validate as an earlier signature), and signatures don't contain their version numbers, there is actually a race condition between a remote node updating "signature" and "signed_num" while the local node is trying to do handle_verify_request. Since the remote node writes in the order (1) signature (2) signed_num, while the local node reads in the order (1) signed_num (2) signature, the local node could read an earlier signed_num and then a later signature. This causes signature validation to fail because the local node retrieves the wrong signature from its log to compare (e.g. it retrieves signature 15, when the signature it got from the SST is the signature for version 17). This race condition can be mostly fixed by having the local node re-read the signed_num after reading the signature and compare it with the value it has cached. If signed_num changed while reading the signature, it means the read interleaved with the remote node's write and we need to try again. If signed_num has the same value twice, it's probably the correct value (matches the signature). It is still slightly possible to get an inconsistent read if the remote node overwrites the signature exactly as the local node is reading it (so it won't have changed signed_num when the local node reads it a second time), but the window for that race condition is much smaller. --- src/core/git_version.cpp | 4 ++-- src/core/persistence_manager.cpp | 27 ++++++++++++++++++++------- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index 0c6ae7e1..43676be3 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 4; const int PATCH_VERSION = 1; -const int COMMITS_AHEAD_OF_VERSION = 9; +const int COMMITS_AHEAD_OF_VERSION = 10; const char* VERSION_STRING = "2.4.1"; -const char* VERSION_STRING_PLUS_COMMITS = "2.4.1+9"; +const char* VERSION_STRING_PLUS_COMMITS = "2.4.1+10"; } diff --git a/src/core/persistence_manager.cpp b/src/core/persistence_manager.cpp index 993d0caa..862342de 100644 --- a/src/core/persistence_manager.cpp +++ b/src/core/persistence_manager.cpp @@ -231,9 +231,8 @@ void PersistenceManager::handle_verify_request(subgroup_id_t subgroup_id, persis if(shard_member_rank == Vc.gmsSST->get_local_index()) { continue; } - // The signature in the other node's "signatures" column corresponds to the version in its "signed_num" column - const persistent::version_t other_signed_version = Vc.gmsSST->signed_num[shard_member_rank][subgroup_id]; - dbg_debug(persistence_logger, "PersistenceManager: Node {} has latest signed version {}, checking that version (verify request was for version {})", Vc.members[shard_member_rank], other_signed_version, version); + // Read the other node's signed_num column, which should indicate what version has been signed + persistent::version_t other_signed_version = Vc.gmsSST->signed_num[shard_member_rank][subgroup_id]; // If this node hasn't finished signing that version yet, we won't be able to check that the other node's signature // matches the local signature. It also means the minimum signed version can't advance past my_signed_version anyway. if(other_signed_version > my_signed_version) { @@ -245,11 +244,25 @@ void PersistenceManager::handle_verify_request(subgroup_id_t subgroup_id, persis dbg_debug(persistence_logger, "PersistenceManager: Skipping signature check for node {} because it has not signed any versions yet", Vc.members[shard_member_rank]); continue; } - //Copy out the signature so it can't change during verification + // Attempt to read the signature from the signature column, then check signed_num again + // If the other node updated the signature while we were reading it, signed_num will change, + // so we have to read both of them again std::vector other_signature(signature_size); - gmssst::set(other_signature.data(), - &Vc.gmsSST->signatures[shard_member_rank][subgroup_id * signature_size], - signature_size); + bool consistent_read = false; + while(!consistent_read) { + gmssst::set(other_signature.data(), + &Vc.gmsSST->signatures[shard_member_rank][subgroup_id * signature_size], + signature_size); + persistent::version_t new_signed_version = Vc.gmsSST->signed_num[shard_member_rank][subgroup_id]; + if(new_signed_version == other_signed_version) { + consistent_read = true; + } else { + dbg_debug(persistence_logger, "PersistenceManager: Read signed_num {}, then {} from node {}, trying again", other_signed_version, new_signed_version, Vc.members[shard_member_rank]); + other_signed_version = new_signed_version; + } + } + dbg_debug(persistence_logger, "PersistenceManager: Got a consistent read of signed_num {} and signature from node {}", other_signed_version, Vc.members[shard_member_rank]); + // Retrieve this node's signature on that version std::vector my_signature = subgroup_object->get_signature(other_signed_version); if(my_signature.size() == 0) { From ceeb679fcd7efcd79d27f7cb968dd63175a94ccd Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Mon, 28 Jul 2025 17:45:40 -0400 Subject: [PATCH 6/7] Fixed initialization of signed_num in new views; fixed default ports Another contributor to the race condition in handle_verify_request is the fact that the new-view initialization of the SST leaves all of the delivery-counter fields (like persisted_num) at their default values, assuming the first delivery predicate will update them, but handle_verify_request can read from a remote node's signed_num before that node has written to it. It's safer to initialize signed_num and the corresponding signature to its correct value from the last view. verified_num is written before it's read, like persisted_num, so it doesn't need to be initialized. Separately, the default values for the Derecho ports cause problems when trying to test locally on Linux machines because they overlap with the Linux ephemeral port range (32768 and higher). Tests keep failing because a process accidentally connects to itself, or to another node's system-assigned outgoing port for a different connection. It's better to use lower values for these ports so they won't overlap with the ephemeral range, and these values are still high enough to be unused by any other service. --- include/derecho/conf/conf.hpp | 16 ++++++++-------- include/derecho/core/detail/derecho_sst.hpp | 2 +- src/conf/derecho-sample.cfg | 18 +++++++++--------- src/conf/derecho_node-sample.cfg | 10 +++++----- src/core/derecho_sst.cpp | 11 ++++++++++- src/core/git_version.cpp | 4 ++-- src/core/view_manager.cpp | 4 ++-- 7 files changed, 37 insertions(+), 28 deletions(-) diff --git a/include/derecho/conf/conf.hpp b/include/derecho/conf/conf.hpp index 8c7162fc..e35f355f 100644 --- a/include/derecho/conf/conf.hpp +++ b/include/derecho/conf/conf.hpp @@ -83,16 +83,16 @@ class Conf { std::map config = { // [DERECHO] {DERECHO_CONTACT_IP, "127.0.0.1"}, - {DERECHO_CONTACT_PORT, "23580"}, + {DERECHO_CONTACT_PORT, "14480"}, {DERECHO_RESTART_LEADERS, "127.0.0.1"}, - {DERECHO_RESTART_LEADER_PORTS, "23580"}, + {DERECHO_RESTART_LEADER_PORTS, "14480"}, {DERECHO_LOCAL_ID, "0"}, {DERECHO_LOCAL_IP, "127.0.0.1"}, - {DERECHO_GMS_PORT, "23580"}, - {DERECHO_STATE_TRANSFER_PORT, "28366"}, - {DERECHO_SST_PORT, "37683"}, - {DERECHO_RDMC_PORT, "31675"}, - {DERECHO_EXTERNAL_PORT, "32645"}, + {DERECHO_GMS_PORT, "14480"}, + {DERECHO_STATE_TRANSFER_PORT, "14560"}, + {DERECHO_SST_PORT, "14660"}, + {DERECHO_RDMC_PORT, "14720"}, + {DERECHO_EXTERNAL_PORT, "14880"}, {SUBGROUP_DEFAULT_RDMC_SEND_ALGORITHM, "binomial_send"}, {DERECHO_P2P_LOOP_BUSY_WAIT_BEFORE_SLEEP_MS, "250"}, {DERECHO_SST_POLL_CQ_TIMEOUT_MS, "2000"}, @@ -126,7 +126,7 @@ class Conf { {LOGGER_DEFAULT_LOG_NAME, "derecho_debug"}, {LOGGER_DEFAULT_LOG_LEVEL, "debug"}, {LOGGER_LOG_TO_TERMINAL, "true"}, - {LOGGER_LOG_FILE_DEPTH, "3"}}; + {LOGGER_LOG_FILE_DEPTH, "10"}}; public: // the option for parsing command line with getopt(not GetPot!!!) diff --git a/include/derecho/core/detail/derecho_sst.hpp b/include/derecho/core/detail/derecho_sst.hpp index aa82bb1c..02b16cb3 100644 --- a/include/derecho/core/detail/derecho_sst.hpp +++ b/include/derecho/core/detail/derecho_sst.hpp @@ -389,7 +389,7 @@ void set(volatile char* string_array, const std::string& value); void increment(volatile int& member); -bool equals(const volatile char& string_array, const std::string& value); +bool equals(const volatile char* string_array, const std::string& value); } // namespace gmssst diff --git a/src/conf/derecho-sample.cfg b/src/conf/derecho-sample.cfg index 90dfb4ac..08a3fb5c 100644 --- a/src/conf/derecho-sample.cfg +++ b/src/conf/derecho-sample.cfg @@ -2,21 +2,21 @@ # contact ip - the active leader's ip address contact_ip = 127.0.0.1 # contact port - the active leader's gms port -contact_port = 23580 +contact_port = 14480 # list of leaders to contact during a restart in priority order restart_leaders = 127.0.0.1,127.0.0.1 # list of GMS ports of the restart leaders, in the same order -restart_leader_ports = 23580,23581 +restart_leader_ports = 14480,14481 # derecho gms port -gms_port = 23580 +gms_port = 14480 # derecho state-transfer port -state_transfer_port = 28366 +state_transfer_port = 14560 # sst tcp port -sst_port = 37683 +sst_port = 14660 # rdmc tcp port -rdmc_port = 31675 +rdmc_port = 14720 # externel tcp port listening to external clients -external_port = 32645 +external_port = 14880 # Maximum possible node ID value # Node IDs are 32-bit integers, but all Derecho systems will have # many fewer nodes than this. Derecho will pre-allocate space for a @@ -145,8 +145,8 @@ persistence_log_level = info # Whether logs should be printed to the terminal as well as saved to files (default is true) log_to_terminal = true # The number of older log files to save. Log files are rotated automatically -# when the current one reaches 1MB in size. Default is 3. -log_file_depth = 3 +# when the current one reaches 1MB in size. Default is 10. +log_file_depth = 10 # optional layout configurations [LAYOUT] diff --git a/src/conf/derecho_node-sample.cfg b/src/conf/derecho_node-sample.cfg index 811363c7..91a883c4 100644 --- a/src/conf/derecho_node-sample.cfg +++ b/src/conf/derecho_node-sample.cfg @@ -6,15 +6,15 @@ local_ip = 127.0.0.1 # These ports are optional: nodes will use the values from the group derecho.cfg by default, # but if the port options are specified here they will override the defaults. # derecho gms port -gms_port = 23580 +gms_port = 14481 # derecho state-transfer port -state_transfer_port = 28366 +state_transfer_port = 14561 # sst tcp port -sst_port = 37683 +sst_port = 14661 # rdmc tcp port -rdmc_port = 31675 +rdmc_port = 14721 # externel tcp port listening to external clients -external_port = 32645 +external_port = 14881 # RDMA section contains configurations of the following diff --git a/src/core/derecho_sst.cpp b/src/core/derecho_sst.cpp index ba505da0..6bf0792a 100644 --- a/src/core/derecho_sst.cpp +++ b/src/core/derecho_sst.cpp @@ -33,7 +33,7 @@ void DerechoSST::init_local_row_from_previous(const DerechoSST& old_sst, const i memcpy(const_cast(joiner_external_ports[local_row]), const_cast(old_sst.joiner_external_ports[row] + num_changes_installed), (old_sst.joiner_external_ports.size() - num_changes_installed) * sizeof(uint16_t)); - //TODO: Copy over the last committed signature here? Or will the new view start with no signatures? + // Initialize these flags to false for(size_t i = 0; i < suspected.size(); ++i) { suspected[local_row][i] = false; } @@ -43,11 +43,20 @@ void DerechoSST::init_local_row_from_previous(const DerechoSST& old_sst, const i for(size_t i = 0; i < global_min.size(); ++i) { global_min[local_row][i] = 0; } + // Initialize these counters with their previous values, except num_installed gets incremented num_changes[local_row] = old_sst.num_changes[row]; num_committed[local_row] = old_sst.num_committed[row]; num_acked[local_row] = old_sst.num_acked[row]; num_installed[local_row] = old_sst.num_installed[row] + num_changes_installed; wedged[local_row] = false; + // Copy over the previous view's last known signature and signed_num array + // Unlike seq_num and persisted_num, these may get read by other nodes before they are updated in the new view + memcpy(const_cast(signed_num[local_row]), + const_cast(old_sst.signed_num[row]), + old_sst.signed_num.size() * sizeof(persistent::version_t)); + memcpy(const_cast(signatures[local_row]), + const_cast(old_sst.signatures[row]), + old_sst.signatures.size() * sizeof(uint8_t)); } void DerechoSST::init_local_change_proposals(const int other_row) { diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index 43676be3..3070f091 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 4; const int PATCH_VERSION = 1; -const int COMMITS_AHEAD_OF_VERSION = 10; +const int COMMITS_AHEAD_OF_VERSION = 11; const char* VERSION_STRING = "2.4.1"; -const char* VERSION_STRING_PLUS_COMMITS = "2.4.1+10"; +const char* VERSION_STRING_PLUS_COMMITS = "2.4.1+11"; } diff --git a/src/core/view_manager.cpp b/src/core/view_manager.cpp index 7d5116a7..baae216b 100644 --- a/src/core/view_manager.cpp +++ b/src/core/view_manager.cpp @@ -1484,7 +1484,7 @@ void ViewManager::deliver_ragged_trim(DerechoSST& gmsSST) { gmsSST.persisted_num[member_row][subgroup_id]) .second < last_delivered_seq_num) { - dbg_debug(vm_logger, "Waiting for node {} to finish persisting update {}", shard_member, last_delivered_seq_num); + dbg_trace(vm_logger, "Waiting for node {} to finish persisting update {}", shard_member, last_delivered_seq_num); return false; } } @@ -1639,7 +1639,6 @@ void ViewManager::finish_view_change(DerechoSST& gmsSST) { // This will block until everyone responds to SST/RDMC initial handshakes transition_multicast_group(next_subgroup_settings, new_num_received_size, new_slot_size, new_index_field_size); dbg_debug(vm_logger, "Done setting up SST and MulticastGroup for view {}; about to do a sync_with_members()", next_view->vid); - dbg_trace(vm_logger, "My row in new SST initialized to: {}", next_view->gmsSST->to_string()); // New members can now proceed to view_manager.finish_setup(), which will call put() and sync() next_view->gmsSST->push_row_except_slots(); @@ -1650,6 +1649,7 @@ void ViewManager::finish_view_change(DerechoSST& gmsSST) { old_views_cv.notify_all(); } curr_view = std::move(next_view); + dbg_trace(vm_logger, "New SST in view {}: {}", curr_view->vid, curr_view->gmsSST->to_string()); if(any_persistent_objects) { // Write the new view to disk before using it From 04a785a34f5ac08c93e25b5d5aaa6201614e152b Mon Sep 17 00:00:00 2001 From: Edward Tremel Date: Wed, 30 Jul 2025 13:41:03 -0400 Subject: [PATCH 7/7] Changed signed_log_test to not rely on barrier_sync Using barrier_sync() in the main thread can "steal" the sync exchanges from a view-change's sync() and prevent either one from completing, which can happen if one node reaches the barrier_sync() while another one is still starting up and joining (causing a view change). It should work better to use a completely different socket and port for the final synchronization, like Cascade's perftest does. --- include/derecho/sst/detail/sst_impl.hpp | 11 ++++-- .../tests/unit_tests/signed_log_test.cpp | 35 ++++++++++++++++++- src/sst/lf.cpp | 4 +++ 3 files changed, 47 insertions(+), 3 deletions(-) diff --git a/include/derecho/sst/detail/sst_impl.hpp b/include/derecho/sst/detail/sst_impl.hpp index 049ebbbe..e7ed09b8 100644 --- a/include/derecho/sst/detail/sst_impl.hpp +++ b/include/derecho/sst/detail/sst_impl.hpp @@ -155,6 +155,7 @@ void SST::put(const std::vector receiver_ranks, size_t off template void SST::put_with_completion(const std::vector receiver_ranks, size_t offset, size_t size) { + dbg_trace(sst_logger, "put_with_completion called with arguments receiver_ranks={}, offset={}, size={}", receiver_ranks, offset, size); assert(offset + size <= rowLen); unsigned int num_writes_posted = 0; std::vector posted_write_to(num_members, false); @@ -267,7 +268,10 @@ void SST::sync_with_members() const { std::tie(node_id, sst_index) = id_index; if(sst_index != my_index && !row_is_frozen[sst_index]) { dbg_debug(sst_logger, "TCP sync with node {}, for row {}", node_id, sst_index); - sync(node_id); + bool success = sync(node_id); + if(!success) { + dbg_warn(sst_logger, "TCP sync with node {} was unsuccessful", node_id); + } } } } @@ -283,7 +287,10 @@ void SST::sync_with_members(std::vector row_indices) const } if(!row_is_frozen[row_index]) { dbg_debug(sst_logger, "TCP sync with node {}, for row {}", members[row_index], row_index); - sync(members[row_index]); + bool success = sync(members[row_index]); + if(!success) { + dbg_warn(sst_logger, "TCP sync with node {} was unsuccessful", members[row_index]); + } } } } diff --git a/src/applications/tests/unit_tests/signed_log_test.cpp b/src/applications/tests/unit_tests/signed_log_test.cpp index 444fd405..39b3306d 100644 --- a/src/applications/tests/unit_tests/signed_log_test.cpp +++ b/src/applications/tests/unit_tests/signed_log_test.cpp @@ -312,6 +312,8 @@ std::unique_ptr UnsignedObject::from_bytes(mutils::Deserializati return std::make_unique(*field_ptr, *counter_ptr, test_state_ptr); } +const int TEST_COORDINATION_PORT = 16000; + /** * Command-line arguments: * one_field_size: Maximum size of the subgroup that replicates the one-field signed object @@ -466,6 +468,37 @@ int main(int argc, char** argv) { test_state.subgroup_finished_condition.wait(lock, [&]() { return test_state.subgroup_finished; }); } std::cout << "Done" << std::endl; - group.barrier_sync(); + // If this node is the leader, open a socket and wait for all the other nodes to contact it + // Otherwise, open a socket to the leader and exchange IDs to signal that this node is finished + if(group.get_my_rank() == 0) { + tcp::connection_listener listener_socket(TEST_COORDINATION_PORT); + std::set nodes_contacted; + nodes_contacted.emplace(group.get_my_id()); + std::vector members_vector = group.get_members(); + std::set all_member_ids(members_vector.begin(), members_vector.end()); + std::vector member_connections; + std::cout << "Waiting for other nodes to signal they are finished (members = " << members_vector << ")" << std::endl; + while(nodes_contacted != all_member_ids) { + member_connections.emplace_back(listener_socket.accept()); + derecho::node_id_t finished_member_id; + member_connections.back().read(finished_member_id); + nodes_contacted.emplace(finished_member_id); + std::cout << "Got a connection from node " << finished_member_id << std::endl; + } + std::cout << "All nodes are done with the test, acknowledging so they can exit" << std::endl; + for(auto& connection : member_connections) { + const int done_signal = 1; + connection.write(done_signal); + } + //member_connections sockets will close automatically at the end of this scope + } else { + derecho::ip_addr_t leader_address = group.get_member_addresses().front().ip_address; + std::cout << "Connecting to leader at " << leader_address << " to signal node " << group.get_my_id() << " is done" << std::endl; + tcp::socket leader_connection(leader_address, TEST_COORDINATION_PORT); + leader_connection.write(group.get_my_id()); + std::cout << "Waiting for leader to signal the test is done" << std::endl; + int done; + leader_connection.read(done); + } group.leave(true); } diff --git a/src/sst/lf.cpp b/src/sst/lf.cpp index 8dfc739b..6e4b2e58 100644 --- a/src/sst/lf.cpp +++ b/src/sst/lf.cpp @@ -936,9 +936,13 @@ bool sync(uint32_t r_id) { int s = 0, t = 0; try { if(sst_connections->contains_node(r_id)) { + dbg_trace(g_ctxt.sst_logger, "Sync: exchanging values with node {}", r_id); sst_connections->exchange(r_id, s, t); + dbg_trace(g_ctxt.sst_logger, "Sync: exchange successful with node {}", r_id); } else if(external_client_connections->contains_node(r_id)) { + dbg_trace(g_ctxt.sst_logger, "Sync: exchanging values with external client {}", r_id); external_client_connections->exchange(r_id, s, t); + dbg_trace(g_ctxt.sst_logger, "Sync: exchange successful with external client {}", r_id); } else { return false; }