diff --git a/include/derecho/conf/conf.hpp b/include/derecho/conf/conf.hpp index 8c7162fc..e35f355f 100644 --- a/include/derecho/conf/conf.hpp +++ b/include/derecho/conf/conf.hpp @@ -83,16 +83,16 @@ class Conf { std::map config = { // [DERECHO] {DERECHO_CONTACT_IP, "127.0.0.1"}, - {DERECHO_CONTACT_PORT, "23580"}, + {DERECHO_CONTACT_PORT, "14480"}, {DERECHO_RESTART_LEADERS, "127.0.0.1"}, - {DERECHO_RESTART_LEADER_PORTS, "23580"}, + {DERECHO_RESTART_LEADER_PORTS, "14480"}, {DERECHO_LOCAL_ID, "0"}, {DERECHO_LOCAL_IP, "127.0.0.1"}, - {DERECHO_GMS_PORT, "23580"}, - {DERECHO_STATE_TRANSFER_PORT, "28366"}, - {DERECHO_SST_PORT, "37683"}, - {DERECHO_RDMC_PORT, "31675"}, - {DERECHO_EXTERNAL_PORT, "32645"}, + {DERECHO_GMS_PORT, "14480"}, + {DERECHO_STATE_TRANSFER_PORT, "14560"}, + {DERECHO_SST_PORT, "14660"}, + {DERECHO_RDMC_PORT, "14720"}, + {DERECHO_EXTERNAL_PORT, "14880"}, {SUBGROUP_DEFAULT_RDMC_SEND_ALGORITHM, "binomial_send"}, {DERECHO_P2P_LOOP_BUSY_WAIT_BEFORE_SLEEP_MS, "250"}, {DERECHO_SST_POLL_CQ_TIMEOUT_MS, "2000"}, @@ -126,7 +126,7 @@ class Conf { {LOGGER_DEFAULT_LOG_NAME, "derecho_debug"}, {LOGGER_DEFAULT_LOG_LEVEL, "debug"}, {LOGGER_LOG_TO_TERMINAL, "true"}, - {LOGGER_LOG_FILE_DEPTH, "3"}}; + {LOGGER_LOG_FILE_DEPTH, "10"}}; public: // the option for parsing command line with getopt(not GetPot!!!) diff --git a/include/derecho/core/detail/derecho_sst.hpp b/include/derecho/core/detail/derecho_sst.hpp index a578e645..02b16cb3 100644 --- a/include/derecho/core/detail/derecho_sst.hpp +++ b/include/derecho/core/detail/derecho_sst.hpp @@ -289,7 +289,7 @@ class DerechoSST : public sst::SST { void push_row_except_slots(); /** - * Creates a string representation of the local row (not the whole table). + * Creates a string representation of the table for debugging purposes. * This should be converted to an ostream operator<< to follow standards. */ std::string to_string() const; @@ -389,7 +389,7 @@ void set(volatile char* string_array, const std::string& value); void increment(volatile int& member); -bool equals(const volatile char& string_array, const std::string& value); +bool equals(const volatile char* string_array, const std::string& value); } // namespace gmssst diff --git a/include/derecho/sst/detail/sst_impl.hpp b/include/derecho/sst/detail/sst_impl.hpp index 26fa1307..e7ed09b8 100644 --- a/include/derecho/sst/detail/sst_impl.hpp +++ b/include/derecho/sst/detail/sst_impl.hpp @@ -155,6 +155,7 @@ void SST::put(const std::vector receiver_ranks, size_t off template void SST::put_with_completion(const std::vector receiver_ranks, size_t offset, size_t size) { + dbg_trace(sst_logger, "put_with_completion called with arguments receiver_ranks={}, offset={}, size={}", receiver_ranks, offset, size); assert(offset + size <= rowLen); unsigned int num_writes_posted = 0; std::vector posted_write_to(num_members, false); @@ -177,6 +178,7 @@ void SST::put_with_completion(const std::vector receiver_r // perform a remote RDMA write on the owner of the row ce_ctxt[index].set_remote_id(res_vec[index]->remote_id); ce_ctxt[index].set_ce_idx(ce_idx); + dbg_trace(sst_logger, "Created a CE context for write to row {}: {:p} -> {{ {}, {}, {} }}", index, static_cast(&ce_ctxt[index]), ce_ctxt[index].ce_idx(), ce_ctxt[index].remote_id(), ce_ctxt[index].is_managed()); res_vec[index]->post_remote_write_with_completion(&ce_ctxt[index], offset, size); posted_write_to[index] = true; num_writes_posted++; @@ -216,6 +218,7 @@ void SST::put_with_completion(const std::vector receiver_r if (result && result.value() == 1) { polled_successfully_from[index] = true; } else { + dbg_debug(sst_logger, "put_with_completion marked row {} failed due to not receiving a completion", index); failed_node_indexes.push_back(index); } } @@ -264,7 +267,11 @@ void SST::sync_with_members() const { for(auto const& id_index : members_by_id) { std::tie(node_id, sst_index) = id_index; if(sst_index != my_index && !row_is_frozen[sst_index]) { - sync(node_id); + dbg_debug(sst_logger, "TCP sync with node {}, for row {}", node_id, sst_index); + bool success = sync(node_id); + if(!success) { + dbg_warn(sst_logger, "TCP sync with node {} was unsuccessful", node_id); + } } } } @@ -279,7 +286,11 @@ void SST::sync_with_members(std::vector row_indices) const continue; } if(!row_is_frozen[row_index]) { - sync(members[row_index]); + dbg_debug(sst_logger, "TCP sync with node {}, for row {}", members[row_index], row_index); + bool success = sync(members[row_index]); + if(!success) { + dbg_warn(sst_logger, "TCP sync with node {} was unsuccessful", members[row_index]); + } } } } diff --git a/include/derecho/sst/sst.hpp b/include/derecho/sst/sst.hpp index a9287da9..91f639ba 100644 --- a/include/derecho/sst/sst.hpp +++ b/include/derecho/sst/sst.hpp @@ -181,6 +181,9 @@ class SST { DerivedSST* derived_this; + /** Pointer to the logger for the SST module, which is created in sst::lf_initialize() */ + std::shared_ptr sst_logger; + std::vector background_threads; std::atomic thread_shutdown; @@ -236,6 +239,7 @@ class SST { public: SST(DerivedSST* derived_class_pointer, const SSTParams& params) : derived_this(derived_class_pointer), + sst_logger(spdlog::get(LoggerFactory::SST_LOGGER_NAME)), thread_shutdown(false), poll_cq_timeout_ms(derecho::getConfUInt32(derecho::Conf::DERECHO_SST_POLL_CQ_TIMEOUT_MS)), members(params.members), diff --git a/src/applications/tests/unit_tests/signed_log_test.cpp b/src/applications/tests/unit_tests/signed_log_test.cpp index 853bea45..39b3306d 100644 --- a/src/applications/tests/unit_tests/signed_log_test.cpp +++ b/src/applications/tests/unit_tests/signed_log_test.cpp @@ -312,23 +312,26 @@ std::unique_ptr UnsignedObject::from_bytes(mutils::Deserializati return std::make_unique(*field_ptr, *counter_ptr, test_state_ptr); } +const int TEST_COORDINATION_PORT = 16000; + /** - * Command-line arguments: + * Command-line arguments: * one_field_size: Maximum size of the subgroup that replicates the one-field signed object * two_field_size: Maximum size of the subgroup that replicates the two-field signed object * mixed_field_size: Maximum size of the subgroup that replicates the mixed-signed-and-unsigned-field object * unsigned_size: Maximum size of the subgroup that replicates the persistent-but-not-signed object - * num_updates: Number of randomly-generated 32-byte updates to send to each subgroup + * num_updates: Number of randomly-generated updates to send to each subgroup + * update_size: Size of the updates, in bytes */ int main(int argc, char** argv) { pthread_setname_np(pthread_self(), "test_main"); const std::string characters("abcdefghijklmnopqrstuvwxyz"); std::mt19937 random_generator(getpid()); std::uniform_int_distribution char_distribution(0, characters.size() - 1); - const int num_args = 5; + const int num_args = 6; if(argc < (num_args + 1) || (argc > (num_args + 1) && strcmp("--", argv[argc - (num_args + 1)]) != 0)) { std::cout << "Invalid command line arguments." << std::endl; - std::cout << "Usage: " << argv[0] << " [derecho-config-options -- ] one_field_size two_field_size mixed_field_size unsigned_size num_updates" << std::endl; + std::cout << "Usage: " << argv[0] << " [derecho-config-options -- ] one_field_size two_field_size mixed_field_size unsigned_size num_updates update_size" << std::endl; return -1; } @@ -336,7 +339,8 @@ int main(int argc, char** argv) { const unsigned int subgroup_2_size = std::stoi(argv[argc - num_args + 1]); const unsigned int subgroup_mixed_size = std::stoi(argv[argc - num_args + 2]); const unsigned int subgroup_unsigned_size = std::stoi(argv[argc - num_args + 3]); - const unsigned int num_updates = std::stoi(argv[argc - 1]); + const unsigned int num_updates = std::stoi(argv[argc - num_args + 4]); + const unsigned int update_size = std::stoi(argv[argc - 1]); derecho::Conf::initialize(argc, argv); derecho::SubgroupInfo subgroup_info( @@ -401,7 +405,7 @@ int main(int argc, char** argv) { test_state.my_subgroup_is_unsigned = false; //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { - std::string new_string('a', 32); + std::string new_string('a', update_size); std::generate(new_string.begin(), new_string.end(), [&]() { return characters[char_distribution(random_generator)]; }); object_handle.ordered_send(new_string); @@ -414,8 +418,8 @@ int main(int argc, char** argv) { test_state.my_subgroup_is_unsigned = false; //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { - std::string new_foo('a', 32); - std::string new_bar('a', 32); + std::string new_foo('a', update_size); + std::string new_bar('a', update_size); std::generate(new_foo.begin(), new_foo.end(), [&]() { return characters[char_distribution(random_generator)]; }); std::generate(new_bar.begin(), new_bar.end(), @@ -430,7 +434,7 @@ int main(int argc, char** argv) { test_state.my_subgroup_is_unsigned = false; //Send random updates, alternating between the signed, unsigned, and nonpersistent fields for(unsigned counter = 0; counter < num_updates; ++counter) { - std::string new_string_value('a', 32); + std::string new_string_value('a', update_size); std::generate(new_string_value.begin(), new_string_value.end(), [&]() { return characters[char_distribution(random_generator)]; }); if(counter % 3 == 0) { @@ -449,7 +453,7 @@ int main(int argc, char** argv) { test_state.my_subgroup_is_unsigned = true; //Send random updates for(unsigned counter = 0; counter < num_updates; ++counter) { - std::string new_string('a', 32); + std::string new_string('a', update_size); std::generate(new_string.begin(), new_string.end(), [&]() { return characters[char_distribution(random_generator)]; }); object_handle.ordered_send(new_string); @@ -464,6 +468,37 @@ int main(int argc, char** argv) { test_state.subgroup_finished_condition.wait(lock, [&]() { return test_state.subgroup_finished; }); } std::cout << "Done" << std::endl; - group.barrier_sync(); + // If this node is the leader, open a socket and wait for all the other nodes to contact it + // Otherwise, open a socket to the leader and exchange IDs to signal that this node is finished + if(group.get_my_rank() == 0) { + tcp::connection_listener listener_socket(TEST_COORDINATION_PORT); + std::set nodes_contacted; + nodes_contacted.emplace(group.get_my_id()); + std::vector members_vector = group.get_members(); + std::set all_member_ids(members_vector.begin(), members_vector.end()); + std::vector member_connections; + std::cout << "Waiting for other nodes to signal they are finished (members = " << members_vector << ")" << std::endl; + while(nodes_contacted != all_member_ids) { + member_connections.emplace_back(listener_socket.accept()); + derecho::node_id_t finished_member_id; + member_connections.back().read(finished_member_id); + nodes_contacted.emplace(finished_member_id); + std::cout << "Got a connection from node " << finished_member_id << std::endl; + } + std::cout << "All nodes are done with the test, acknowledging so they can exit" << std::endl; + for(auto& connection : member_connections) { + const int done_signal = 1; + connection.write(done_signal); + } + //member_connections sockets will close automatically at the end of this scope + } else { + derecho::ip_addr_t leader_address = group.get_member_addresses().front().ip_address; + std::cout << "Connecting to leader at " << leader_address << " to signal node " << group.get_my_id() << " is done" << std::endl; + tcp::socket leader_connection(leader_address, TEST_COORDINATION_PORT); + leader_connection.write(group.get_my_id()); + std::cout << "Waiting for leader to signal the test is done" << std::endl; + int done; + leader_connection.read(done); + } group.leave(true); } diff --git a/src/conf/derecho-sample.cfg b/src/conf/derecho-sample.cfg index 90dfb4ac..08a3fb5c 100644 --- a/src/conf/derecho-sample.cfg +++ b/src/conf/derecho-sample.cfg @@ -2,21 +2,21 @@ # contact ip - the active leader's ip address contact_ip = 127.0.0.1 # contact port - the active leader's gms port -contact_port = 23580 +contact_port = 14480 # list of leaders to contact during a restart in priority order restart_leaders = 127.0.0.1,127.0.0.1 # list of GMS ports of the restart leaders, in the same order -restart_leader_ports = 23580,23581 +restart_leader_ports = 14480,14481 # derecho gms port -gms_port = 23580 +gms_port = 14480 # derecho state-transfer port -state_transfer_port = 28366 +state_transfer_port = 14560 # sst tcp port -sst_port = 37683 +sst_port = 14660 # rdmc tcp port -rdmc_port = 31675 +rdmc_port = 14720 # externel tcp port listening to external clients -external_port = 32645 +external_port = 14880 # Maximum possible node ID value # Node IDs are 32-bit integers, but all Derecho systems will have # many fewer nodes than this. Derecho will pre-allocate space for a @@ -145,8 +145,8 @@ persistence_log_level = info # Whether logs should be printed to the terminal as well as saved to files (default is true) log_to_terminal = true # The number of older log files to save. Log files are rotated automatically -# when the current one reaches 1MB in size. Default is 3. -log_file_depth = 3 +# when the current one reaches 1MB in size. Default is 10. +log_file_depth = 10 # optional layout configurations [LAYOUT] diff --git a/src/conf/derecho_node-sample.cfg b/src/conf/derecho_node-sample.cfg index 811363c7..91a883c4 100644 --- a/src/conf/derecho_node-sample.cfg +++ b/src/conf/derecho_node-sample.cfg @@ -6,15 +6,15 @@ local_ip = 127.0.0.1 # These ports are optional: nodes will use the values from the group derecho.cfg by default, # but if the port options are specified here they will override the defaults. # derecho gms port -gms_port = 23580 +gms_port = 14481 # derecho state-transfer port -state_transfer_port = 28366 +state_transfer_port = 14561 # sst tcp port -sst_port = 37683 +sst_port = 14661 # rdmc tcp port -rdmc_port = 31675 +rdmc_port = 14721 # externel tcp port listening to external clients -external_port = 32645 +external_port = 14881 # RDMA section contains configurations of the following diff --git a/src/core/derecho_sst.cpp b/src/core/derecho_sst.cpp index 6f0170bb..6bf0792a 100644 --- a/src/core/derecho_sst.cpp +++ b/src/core/derecho_sst.cpp @@ -1,5 +1,6 @@ #include +#include #include #include @@ -32,7 +33,7 @@ void DerechoSST::init_local_row_from_previous(const DerechoSST& old_sst, const i memcpy(const_cast(joiner_external_ports[local_row]), const_cast(old_sst.joiner_external_ports[row] + num_changes_installed), (old_sst.joiner_external_ports.size() - num_changes_installed) * sizeof(uint16_t)); - //TODO: Copy over the last committed signature here? Or will the new view start with no signatures? + // Initialize these flags to false for(size_t i = 0; i < suspected.size(); ++i) { suspected[local_row][i] = false; } @@ -42,11 +43,20 @@ void DerechoSST::init_local_row_from_previous(const DerechoSST& old_sst, const i for(size_t i = 0; i < global_min.size(); ++i) { global_min[local_row][i] = 0; } + // Initialize these counters with their previous values, except num_installed gets incremented num_changes[local_row] = old_sst.num_changes[row]; num_committed[local_row] = old_sst.num_committed[row]; num_acked[local_row] = old_sst.num_acked[row]; num_installed[local_row] = old_sst.num_installed[row] + num_changes_installed; wedged[local_row] = false; + // Copy over the previous view's last known signature and signed_num array + // Unlike seq_num and persisted_num, these may get read by other nodes before they are updated in the new view + memcpy(const_cast(signed_num[local_row]), + const_cast(old_sst.signed_num[row]), + old_sst.signed_num.size() * sizeof(persistent::version_t)); + memcpy(const_cast(signatures[local_row]), + const_cast(old_sst.signatures[row]), + old_sst.signatures.size() * sizeof(uint8_t)); } void DerechoSST::init_local_change_proposals(const int other_row) { @@ -93,24 +103,43 @@ std::string DerechoSST::to_string() const { for(uint row = 0; row < num_rows; ++row) { s << "row=" << row << " "; s << "vid=" << vid[row] << " "; - s << "suspected={ "; + s << "seq_num={ "; + for(unsigned int n = 0; n < seq_num.size(); n++) { + s << seq_num[row][n] << " "; + } + s << "}" + << ", delivered_num={ "; + for(unsigned int n = 0; n < delivered_num.size(); n++) { + s << delivered_num[row][n] << " "; + } + s << "}" + << ", persisted_num={ "; + for(unsigned int n = 0; n < persisted_num.size(); n++) { + s << persisted_num[row][n] << " "; + } + s << "}" + << ", signed_num={ "; + for(unsigned int n = 0; n < signed_num.size(); n++) { + s << signed_num[row][n] << " "; + } + s << "}" + << ", verified_num={ "; + for(unsigned int n = 0; n < verified_num.size(); n++) { + s << verified_num[row][n] << " "; + } + s << "}" + << ", suspected={ "; for(unsigned int n = 0; n < suspected.size(); n++) { s << (suspected[row][n] ? "T" : "F") << " "; } - - s << "}, num_changes=" << num_changes[row] << ", num_committed=" - << num_committed[row] << ", num_installed=" << num_installed[row]; - s << ", changes={ "; + s << "}" + << ", changes={ "; for(int n = 0; n < (num_changes[row] - num_installed[row]); ++n) { s << "(" << changes[row][n].change_id << "," << changes[row][n].leader_id << ") "; } - s << "}, num_acked= " << num_acked[row] << ", num_received={ "; - for(unsigned int n = 0; n < num_received.size(); n++) { - s << num_received[row][n] << " "; - } s << "}, joiner_ips={ "; for(int n = 0; n < (num_changes[row] - num_installed[row]); ++n) { - s << joiner_ips[row][n] << " "; + s << inet_ntoa(in_addr{joiner_ips[row][n]}) << " "; } s << "}, joiner_gms_ports={ "; for(int n = 0; n < (num_changes[row] - num_installed[row]); ++n) { @@ -132,27 +161,31 @@ std::string DerechoSST::to_string() const { for(int n = 0; n < (num_changes[row] - num_installed[row]); ++n) { s << joiner_external_ports[row][n] << " "; } - s << "}, seq_num={ "; - for(unsigned int n = 0; n < seq_num.size(); n++) { - s << seq_num[row][n] << " "; - } - s << "}" - << ", delivered_num={ "; - for(unsigned int n = 0; n < delivered_num.size(); n++) { - s << delivered_num[row][n] << " "; + s << "}, num_changes=" << num_changes[row] + << ", num_committed=" << num_committed[row] + << ", num_acked=" << num_acked[row] + << ", num_installed=" << num_installed[row]; + s << ", num_received={ "; + for(unsigned int n = 0; n < num_received.size(); n++) { + s << num_received[row][n] << " "; } s << "}" - << ", wedged = " << (wedged[row] ? "T" : "F") << ", global_min = { "; + << ", wedged = " << (wedged[row] ? "T" : "F") + << ", global_min = { "; for(unsigned int n = 0; n < global_min.size(); n++) { s << global_min[row][n] << " "; } - s << "}, global_min_ready= { "; for(uint n = 0; n < global_min_ready.size(); n++) { - s << global_min_ready[row] << " "; + s << (global_min_ready[row] ? "T" : "F") << " "; + } + s << "}" + << ", local_stability_frontier={"; + for(unsigned int n = 0; n < local_stability_frontier.size(); n++) { + s << local_stability_frontier[row][n] << " "; } s << "}" - << ", rip = " << rip[row] << std::endl; + << ", rip = " << (rip[row] ? "T" : "F") << std::endl; } return s.str(); } diff --git a/src/core/git_version.cpp b/src/core/git_version.cpp index 9b865044..3070f091 100644 --- a/src/core/git_version.cpp +++ b/src/core/git_version.cpp @@ -13,8 +13,8 @@ namespace derecho { const int MAJOR_VERSION = 2; const int MINOR_VERSION = 4; const int PATCH_VERSION = 1; -const int COMMITS_AHEAD_OF_VERSION = 4; +const int COMMITS_AHEAD_OF_VERSION = 11; const char* VERSION_STRING = "2.4.1"; -const char* VERSION_STRING_PLUS_COMMITS = "2.4.1+4"; +const char* VERSION_STRING_PLUS_COMMITS = "2.4.1+11"; } diff --git a/src/core/multicast_group.cpp b/src/core/multicast_group.cpp index 4081fa7f..04e9b848 100644 --- a/src/core/multicast_group.cpp +++ b/src/core/multicast_group.cpp @@ -484,6 +484,7 @@ void MulticastGroup::initialize_sst_row() { sst->seq_num[member_index][j] = -1; sst->delivered_num[member_index][j] = -1; sst->persisted_num[member_index][j] = -1; + sst->signed_num[member_index][j] = -1; sst->verified_num[member_index][j] = -1; } memset(const_cast(sst->signatures[member_index]), 0, sst->signatures.size()); diff --git a/src/core/persistence_manager.cpp b/src/core/persistence_manager.cpp index 95b3bbdc..862342de 100644 --- a/src/core/persistence_manager.cpp +++ b/src/core/persistence_manager.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -175,8 +176,10 @@ void PersistenceManager::handle_persist_request(subgroup_id_t subgroup_id, persi // Only update the signature and signed_num in SST if signed_num has in fact advanced if(object_has_signature && Vc.gmsSST->signed_num[Vc.gmsSST->get_local_index()][subgroup_id] < signed_version) { + dbg_trace(persistence_logger, "PersistenceManager: Copying signature for version {} into SST: {:n}", signed_version, spdlog::to_hex(&signature[0], &signature[signature_size])); gmssst::set(&(Vc.gmsSST->signatures[Vc.gmsSST->get_local_index()][subgroup_id * signature_size]), signature, signature_size); + dbg_trace(persistence_logger, "PersistenceManager: Updating subgroup {} signed_num from {} to {}", subgroup_id, Vc.gmsSST->signed_num[Vc.gmsSST->get_local_index()][subgroup_id], signed_version); gmssst::set(Vc.gmsSST->signed_num[Vc.gmsSST->get_local_index()][subgroup_id], signed_version); Vc.gmsSST->put(Vc.multicast_group->get_shard_sst_indices(subgroup_id), (uint8_t*)(&Vc.gmsSST->signatures[0][subgroup_id * signature_size]) - Vc.gmsSST->getBaseAddress(), @@ -228,19 +231,38 @@ void PersistenceManager::handle_verify_request(subgroup_id_t subgroup_id, persis if(shard_member_rank == Vc.gmsSST->get_local_index()) { continue; } - // The signature in the other node's "signatures" column corresponds to the version in its "signed_num" column - const persistent::version_t other_signed_version = Vc.gmsSST->signed_num[shard_member_rank][subgroup_id]; + // Read the other node's signed_num column, which should indicate what version has been signed + persistent::version_t other_signed_version = Vc.gmsSST->signed_num[shard_member_rank][subgroup_id]; // If this node hasn't finished signing that version yet, we won't be able to check that the other node's signature // matches the local signature. It also means the minimum signed version can't advance past my_signed_version anyway. if(other_signed_version > my_signed_version) { dbg_debug(persistence_logger, "PersistenceManager: Skipping signature check on version {} from node {} because this node hasn't signed that version yet", other_signed_version, Vc.members[shard_member_rank]); continue; } - //Copy out the signature so it can't change during verification + // If the other node hasn't signed anything yet, its signed_num will still be at the initial value of -1, which is not a valid version + if(other_signed_version == -1) { + dbg_debug(persistence_logger, "PersistenceManager: Skipping signature check for node {} because it has not signed any versions yet", Vc.members[shard_member_rank]); + continue; + } + // Attempt to read the signature from the signature column, then check signed_num again + // If the other node updated the signature while we were reading it, signed_num will change, + // so we have to read both of them again std::vector other_signature(signature_size); - gmssst::set(other_signature.data(), - &Vc.gmsSST->signatures[shard_member_rank][subgroup_id * signature_size], - signature_size); + bool consistent_read = false; + while(!consistent_read) { + gmssst::set(other_signature.data(), + &Vc.gmsSST->signatures[shard_member_rank][subgroup_id * signature_size], + signature_size); + persistent::version_t new_signed_version = Vc.gmsSST->signed_num[shard_member_rank][subgroup_id]; + if(new_signed_version == other_signed_version) { + consistent_read = true; + } else { + dbg_debug(persistence_logger, "PersistenceManager: Read signed_num {}, then {} from node {}, trying again", other_signed_version, new_signed_version, Vc.members[shard_member_rank]); + other_signed_version = new_signed_version; + } + } + dbg_debug(persistence_logger, "PersistenceManager: Got a consistent read of signed_num {} and signature from node {}", other_signed_version, Vc.members[shard_member_rank]); + // Retrieve this node's signature on that version std::vector my_signature = subgroup_object->get_signature(other_signed_version); if(my_signature.size() == 0) { @@ -252,6 +274,8 @@ void PersistenceManager::handle_verify_request(subgroup_id_t subgroup_id, persis minimum_verified_version = std::min(minimum_verified_version, other_signed_version); } else { dbg_warn(persistence_logger, "Signature for version {} from node {} did not match my signature!", other_signed_version, Vc.members[shard_member_rank]); + dbg_debug(persistence_logger, "my_signature = {:n}", spdlog::to_hex(my_signature)); + dbg_debug(persistence_logger, "other_signature = {:n}", spdlog::to_hex(other_signature)); } } //Update verified_num to the lowest version number that successfully verified across all shard members diff --git a/src/core/view_manager.cpp b/src/core/view_manager.cpp index 26832d94..baae216b 100644 --- a/src/core/view_manager.cpp +++ b/src/core/view_manager.cpp @@ -520,6 +520,7 @@ void ViewManager::finish_setup() { curr_view->gmsSST->push_row_except_slots(); dbg_debug(vm_logger, "Joining node initialized its SST row from the leader"); } + dbg_trace(vm_logger, "Initial SST state: {}", curr_view->gmsSST->to_string()); // Handle any external-client requests that were waiting for the group to start for(auto& external_socket : startup_pending_external_sockets) { @@ -1483,7 +1484,7 @@ void ViewManager::deliver_ragged_trim(DerechoSST& gmsSST) { gmsSST.persisted_num[member_row][subgroup_id]) .second < last_delivered_seq_num) { - dbg_debug(vm_logger, "Waiting for node {} to finish persisting update {}", shard_member, last_delivered_seq_num); + dbg_trace(vm_logger, "Waiting for node {} to finish persisting update {}", shard_member, last_delivered_seq_num); return false; } } @@ -1595,6 +1596,7 @@ void ViewManager::finish_view_change(DerechoSST& gmsSST) { gmsSST.predicates.remove(leader_suspicion_handle); gmsSST.predicates.remove(follower_suspicion_handle); + dbg_trace(vm_logger, "Old SST state at end of view {}: {}", curr_view->vid, gmsSST.to_string()); dbg_debug(vm_logger, "Starting creation of new SST and DerechoGroup for view {}", next_view->vid); for(const node_id_t failed_node_id : next_view->departed) { dbg_debug(vm_logger, "Removing global TCP connections for failed node {} from RDMC and SST", failed_node_id); @@ -1647,6 +1649,7 @@ void ViewManager::finish_view_change(DerechoSST& gmsSST) { old_views_cv.notify_all(); } curr_view = std::move(next_view); + dbg_trace(vm_logger, "New SST in view {}: {}", curr_view->vid, curr_view->gmsSST->to_string()); if(any_persistent_objects) { // Write the new view to disk before using it diff --git a/src/sst/lf.cpp b/src/sst/lf.cpp index 8dfc739b..6e4b2e58 100644 --- a/src/sst/lf.cpp +++ b/src/sst/lf.cpp @@ -936,9 +936,13 @@ bool sync(uint32_t r_id) { int s = 0, t = 0; try { if(sst_connections->contains_node(r_id)) { + dbg_trace(g_ctxt.sst_logger, "Sync: exchanging values with node {}", r_id); sst_connections->exchange(r_id, s, t); + dbg_trace(g_ctxt.sst_logger, "Sync: exchange successful with node {}", r_id); } else if(external_client_connections->contains_node(r_id)) { + dbg_trace(g_ctxt.sst_logger, "Sync: exchanging values with external client {}", r_id); external_client_connections->exchange(r_id, s, t); + dbg_trace(g_ctxt.sst_logger, "Sync: exchange successful with external client {}", r_id); } else { return false; }