Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions include/derecho/conf/conf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,16 @@ class Conf {
std::map<const std::string, std::string> config = {
// [DERECHO]
{DERECHO_CONTACT_IP, "127.0.0.1"},
{DERECHO_CONTACT_PORT, "23580"},
{DERECHO_CONTACT_PORT, "14480"},
{DERECHO_RESTART_LEADERS, "127.0.0.1"},
{DERECHO_RESTART_LEADER_PORTS, "23580"},
{DERECHO_RESTART_LEADER_PORTS, "14480"},
{DERECHO_LOCAL_ID, "0"},
{DERECHO_LOCAL_IP, "127.0.0.1"},
{DERECHO_GMS_PORT, "23580"},
{DERECHO_STATE_TRANSFER_PORT, "28366"},
{DERECHO_SST_PORT, "37683"},
{DERECHO_RDMC_PORT, "31675"},
{DERECHO_EXTERNAL_PORT, "32645"},
{DERECHO_GMS_PORT, "14480"},
{DERECHO_STATE_TRANSFER_PORT, "14560"},
{DERECHO_SST_PORT, "14660"},
{DERECHO_RDMC_PORT, "14720"},
{DERECHO_EXTERNAL_PORT, "14880"},
{SUBGROUP_DEFAULT_RDMC_SEND_ALGORITHM, "binomial_send"},
{DERECHO_P2P_LOOP_BUSY_WAIT_BEFORE_SLEEP_MS, "250"},
{DERECHO_SST_POLL_CQ_TIMEOUT_MS, "2000"},
Expand Down Expand Up @@ -126,7 +126,7 @@ class Conf {
{LOGGER_DEFAULT_LOG_NAME, "derecho_debug"},
{LOGGER_DEFAULT_LOG_LEVEL, "debug"},
{LOGGER_LOG_TO_TERMINAL, "true"},
{LOGGER_LOG_FILE_DEPTH, "3"}};
{LOGGER_LOG_FILE_DEPTH, "10"}};

public:
// the option for parsing command line with getopt(not GetPot!!!)
Expand Down
4 changes: 2 additions & 2 deletions include/derecho/core/detail/derecho_sst.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class DerechoSST : public sst::SST<DerechoSST> {
void push_row_except_slots();

/**
* Creates a string representation of the local row (not the whole table).
* Creates a string representation of the table for debugging purposes.
* This should be converted to an ostream operator<< to follow standards.
*/
std::string to_string() const;
Expand Down Expand Up @@ -389,7 +389,7 @@ void set(volatile char* string_array, const std::string& value);

void increment(volatile int& member);

bool equals(const volatile char& string_array, const std::string& value);
bool equals(const volatile char* string_array, const std::string& value);

} // namespace gmssst

Expand Down
15 changes: 13 additions & 2 deletions include/derecho/sst/detail/sst_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ void SST<DerivedSST>::put(const std::vector<uint32_t> receiver_ranks, size_t off

template <typename DerivedSST>
void SST<DerivedSST>::put_with_completion(const std::vector<uint32_t> receiver_ranks, size_t offset, size_t size) {
dbg_trace(sst_logger, "put_with_completion called with arguments receiver_ranks={}, offset={}, size={}", receiver_ranks, offset, size);
assert(offset + size <= rowLen);
unsigned int num_writes_posted = 0;
std::vector<bool> posted_write_to(num_members, false);
Expand All @@ -177,6 +178,7 @@ void SST<DerivedSST>::put_with_completion(const std::vector<uint32_t> receiver_r
// perform a remote RDMA write on the owner of the row
ce_ctxt[index].set_remote_id(res_vec[index]->remote_id);
ce_ctxt[index].set_ce_idx(ce_idx);
dbg_trace(sst_logger, "Created a CE context for write to row {}: {:p} -> {{ {}, {}, {} }}", index, static_cast<void*>(&ce_ctxt[index]), ce_ctxt[index].ce_idx(), ce_ctxt[index].remote_id(), ce_ctxt[index].is_managed());
res_vec[index]->post_remote_write_with_completion(&ce_ctxt[index], offset, size);
posted_write_to[index] = true;
num_writes_posted++;
Expand Down Expand Up @@ -216,6 +218,7 @@ void SST<DerivedSST>::put_with_completion(const std::vector<uint32_t> receiver_r
if (result && result.value() == 1) {
polled_successfully_from[index] = true;
} else {
dbg_debug(sst_logger, "put_with_completion marked row {} failed due to not receiving a completion", index);
failed_node_indexes.push_back(index);
}
}
Expand Down Expand Up @@ -264,7 +267,11 @@ void SST<DerivedSST>::sync_with_members() const {
for(auto const& id_index : members_by_id) {
std::tie(node_id, sst_index) = id_index;
if(sst_index != my_index && !row_is_frozen[sst_index]) {
sync(node_id);
dbg_debug(sst_logger, "TCP sync with node {}, for row {}", node_id, sst_index);
bool success = sync(node_id);
if(!success) {
dbg_warn(sst_logger, "TCP sync with node {} was unsuccessful", node_id);
}
}
}
}
Expand All @@ -279,7 +286,11 @@ void SST<DerivedSST>::sync_with_members(std::vector<uint32_t> row_indices) const
continue;
}
if(!row_is_frozen[row_index]) {
sync(members[row_index]);
dbg_debug(sst_logger, "TCP sync with node {}, for row {}", members[row_index], row_index);
bool success = sync(members[row_index]);
if(!success) {
dbg_warn(sst_logger, "TCP sync with node {} was unsuccessful", members[row_index]);
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions include/derecho/sst/sst.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ class SST {

DerivedSST* derived_this;

/** Pointer to the logger for the SST module, which is created in sst::lf_initialize() */
std::shared_ptr<spdlog::logger> sst_logger;

std::vector<std::thread> background_threads;
std::atomic<bool> thread_shutdown;

Expand Down Expand Up @@ -236,6 +239,7 @@ class SST {
public:
SST(DerivedSST* derived_class_pointer, const SSTParams& params)
: derived_this(derived_class_pointer),
sst_logger(spdlog::get(LoggerFactory::SST_LOGGER_NAME)),
thread_shutdown(false),
poll_cq_timeout_ms(derecho::getConfUInt32(derecho::Conf::DERECHO_SST_POLL_CQ_TIMEOUT_MS)),
members(params.members),
Expand Down
57 changes: 46 additions & 11 deletions src/applications/tests/unit_tests/signed_log_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,31 +312,35 @@ std::unique_ptr<UnsignedObject> UnsignedObject::from_bytes(mutils::Deserializati
return std::make_unique<UnsignedObject>(*field_ptr, *counter_ptr, test_state_ptr);
}

const int TEST_COORDINATION_PORT = 16000;

/**
* Command-line arguments: <one_field_size> <two_field_size> <unsigned_size> <num_updates>
* Command-line arguments: <one_field_size> <two_field_size> <unsigned_size> <num_updates> <update_size>
* one_field_size: Maximum size of the subgroup that replicates the one-field signed object
* two_field_size: Maximum size of the subgroup that replicates the two-field signed object
* mixed_field_size: Maximum size of the subgroup that replicates the mixed-signed-and-unsigned-field object
* unsigned_size: Maximum size of the subgroup that replicates the persistent-but-not-signed object
* num_updates: Number of randomly-generated 32-byte updates to send to each subgroup
* num_updates: Number of randomly-generated updates to send to each subgroup
* update_size: Size of the updates, in bytes
*/
int main(int argc, char** argv) {
pthread_setname_np(pthread_self(), "test_main");
const std::string characters("abcdefghijklmnopqrstuvwxyz");
std::mt19937 random_generator(getpid());
std::uniform_int_distribution<std::size_t> char_distribution(0, characters.size() - 1);
const int num_args = 5;
const int num_args = 6;
if(argc < (num_args + 1) || (argc > (num_args + 1) && strcmp("--", argv[argc - (num_args + 1)]) != 0)) {
std::cout << "Invalid command line arguments." << std::endl;
std::cout << "Usage: " << argv[0] << " [derecho-config-options -- ] one_field_size two_field_size mixed_field_size unsigned_size num_updates" << std::endl;
std::cout << "Usage: " << argv[0] << " [derecho-config-options -- ] one_field_size two_field_size mixed_field_size unsigned_size num_updates update_size" << std::endl;
return -1;
}

const unsigned int subgroup_1_size = std::stoi(argv[argc - num_args]);
const unsigned int subgroup_2_size = std::stoi(argv[argc - num_args + 1]);
const unsigned int subgroup_mixed_size = std::stoi(argv[argc - num_args + 2]);
const unsigned int subgroup_unsigned_size = std::stoi(argv[argc - num_args + 3]);
const unsigned int num_updates = std::stoi(argv[argc - 1]);
const unsigned int num_updates = std::stoi(argv[argc - num_args + 4]);
const unsigned int update_size = std::stoi(argv[argc - 1]);
derecho::Conf::initialize(argc, argv);

derecho::SubgroupInfo subgroup_info(
Expand Down Expand Up @@ -401,7 +405,7 @@ int main(int argc, char** argv) {
test_state.my_subgroup_is_unsigned = false;
//Send random updates
for(unsigned counter = 0; counter < num_updates; ++counter) {
std::string new_string('a', 32);
std::string new_string('a', update_size);
std::generate(new_string.begin(), new_string.end(),
[&]() { return characters[char_distribution(random_generator)]; });
object_handle.ordered_send<RPC_NAME(update_state)>(new_string);
Expand All @@ -414,8 +418,8 @@ int main(int argc, char** argv) {
test_state.my_subgroup_is_unsigned = false;
//Send random updates
for(unsigned counter = 0; counter < num_updates; ++counter) {
std::string new_foo('a', 32);
std::string new_bar('a', 32);
std::string new_foo('a', update_size);
std::string new_bar('a', update_size);
std::generate(new_foo.begin(), new_foo.end(),
[&]() { return characters[char_distribution(random_generator)]; });
std::generate(new_bar.begin(), new_bar.end(),
Expand All @@ -430,7 +434,7 @@ int main(int argc, char** argv) {
test_state.my_subgroup_is_unsigned = false;
//Send random updates, alternating between the signed, unsigned, and nonpersistent fields
for(unsigned counter = 0; counter < num_updates; ++counter) {
std::string new_string_value('a', 32);
std::string new_string_value('a', update_size);
std::generate(new_string_value.begin(), new_string_value.end(),
[&]() { return characters[char_distribution(random_generator)]; });
if(counter % 3 == 0) {
Expand All @@ -449,7 +453,7 @@ int main(int argc, char** argv) {
test_state.my_subgroup_is_unsigned = true;
//Send random updates
for(unsigned counter = 0; counter < num_updates; ++counter) {
std::string new_string('a', 32);
std::string new_string('a', update_size);
std::generate(new_string.begin(), new_string.end(),
[&]() { return characters[char_distribution(random_generator)]; });
object_handle.ordered_send<RPC_NAME(update_state)>(new_string);
Expand All @@ -464,6 +468,37 @@ int main(int argc, char** argv) {
test_state.subgroup_finished_condition.wait(lock, [&]() { return test_state.subgroup_finished; });
}
std::cout << "Done" << std::endl;
group.barrier_sync();
// If this node is the leader, open a socket and wait for all the other nodes to contact it
// Otherwise, open a socket to the leader and exchange IDs to signal that this node is finished
if(group.get_my_rank() == 0) {
tcp::connection_listener listener_socket(TEST_COORDINATION_PORT);
std::set<derecho::node_id_t> nodes_contacted;
nodes_contacted.emplace(group.get_my_id());
std::vector<derecho::node_id_t> members_vector = group.get_members();
std::set<derecho::node_id_t> all_member_ids(members_vector.begin(), members_vector.end());
std::vector<tcp::socket> member_connections;
std::cout << "Waiting for other nodes to signal they are finished (members = " << members_vector << ")" << std::endl;
while(nodes_contacted != all_member_ids) {
member_connections.emplace_back(listener_socket.accept());
derecho::node_id_t finished_member_id;
member_connections.back().read(finished_member_id);
nodes_contacted.emplace(finished_member_id);
std::cout << "Got a connection from node " << finished_member_id << std::endl;
}
std::cout << "All nodes are done with the test, acknowledging so they can exit" << std::endl;
for(auto& connection : member_connections) {
const int done_signal = 1;
connection.write(done_signal);
}
//member_connections sockets will close automatically at the end of this scope
} else {
derecho::ip_addr_t leader_address = group.get_member_addresses().front().ip_address;
std::cout << "Connecting to leader at " << leader_address << " to signal node " << group.get_my_id() << " is done" << std::endl;
tcp::socket leader_connection(leader_address, TEST_COORDINATION_PORT);
leader_connection.write(group.get_my_id());
std::cout << "Waiting for leader to signal the test is done" << std::endl;
int done;
leader_connection.read(done);
}
group.leave(true);
}
18 changes: 9 additions & 9 deletions src/conf/derecho-sample.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
# contact ip - the active leader's ip address
contact_ip = 127.0.0.1
# contact port - the active leader's gms port
contact_port = 23580
contact_port = 14480
# list of leaders to contact during a restart in priority order
restart_leaders = 127.0.0.1,127.0.0.1
# list of GMS ports of the restart leaders, in the same order
restart_leader_ports = 23580,23581
restart_leader_ports = 14480,14481
# derecho gms port
gms_port = 23580
gms_port = 14480
# derecho state-transfer port
state_transfer_port = 28366
state_transfer_port = 14560
# sst tcp port
sst_port = 37683
sst_port = 14660
# rdmc tcp port
rdmc_port = 31675
rdmc_port = 14720
# externel tcp port listening to external clients
external_port = 32645
external_port = 14880
# Maximum possible node ID value
# Node IDs are 32-bit integers, but all Derecho systems will have
# many fewer nodes than this. Derecho will pre-allocate space for a
Expand Down Expand Up @@ -145,8 +145,8 @@ persistence_log_level = info
# Whether logs should be printed to the terminal as well as saved to files (default is true)
log_to_terminal = true
# The number of older log files to save. Log files are rotated automatically
# when the current one reaches 1MB in size. Default is 3.
log_file_depth = 3
# when the current one reaches 1MB in size. Default is 10.
log_file_depth = 10

# optional layout configurations
[LAYOUT]
Expand Down
10 changes: 5 additions & 5 deletions src/conf/derecho_node-sample.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ local_ip = 127.0.0.1
# These ports are optional: nodes will use the values from the group derecho.cfg by default,
# but if the port options are specified here they will override the defaults.
# derecho gms port
gms_port = 23580
gms_port = 14481
# derecho state-transfer port
state_transfer_port = 28366
state_transfer_port = 14561
# sst tcp port
sst_port = 37683
sst_port = 14661
# rdmc tcp port
rdmc_port = 31675
rdmc_port = 14721
# externel tcp port listening to external clients
external_port = 32645
external_port = 14881


# RDMA section contains configurations of the following
Expand Down
Loading