From 9587c4a451ee31d1658549a7f5133239df4cf27d Mon Sep 17 00:00:00 2001 From: aliciayuting Date: Mon, 12 Jan 2026 19:18:49 -0500 Subject: [PATCH 1/5] implemented put_by_time --- .../cascade/detail/service_client_impl.hpp | 104 ++++++++++++++++++ include/cascade/service_client.hpp | 60 ++++++++++ include/cascade/utils.hpp | 3 + 3 files changed, 167 insertions(+) diff --git a/include/cascade/detail/service_client_impl.hpp b/include/cascade/detail/service_client_impl.hpp index 4c0269e0..233075da 100644 --- a/include/cascade/detail/service_client_impl.hpp +++ b/include/cascade/detail/service_client_impl.hpp @@ -758,6 +758,110 @@ void ServiceClient::collective_trigger_put( } } +template +template +derecho::rpc::QueryResults ServiceClient::put_by_time( + const ObjectType& value, const uint64_t& timestamp_us, bool as_trigger) { + // STEP 1 - get key + if constexpr (!std::is_base_of_v,ObjectType>) { + throw derecho::derecho_exception(std::string("ServiceClient<>::put_by_time() only support object of type ICascadeObject,but we get ") + typeid(ObjectType).name()); + } + + // STEP 2 - validate timestamp: reject if timestamp_us < get_walltime() - Delta + uint64_t now_us = get_walltime() / 1000ULL; // Convert nanoseconds to microseconds + uint64_t delta_us = PUT_BY_TIME_DELTA_NS / 1000ULL; // Convert nanoseconds to microseconds + + if (timestamp_us < (now_us - delta_us)) { + dbg_default_warn("put_by_time: timestamp {}us is too old (now={}us, delta={}us), rejecting", + timestamp_us, now_us, delta_us); + throw derecho::derecho_exception("put_by_time: timestamp is too old (older than now - delta)"); + } + + // STEP 3 - get shard + uint32_t subgroup_type_index,subgroup_index,shard_index; + std::tie(subgroup_type_index,subgroup_index,shard_index) = this->template key_to_shard(value.get_key_ref()); + + // STEP 4 - call recursive put_by_time + return this->template type_recursive_put_by_time(subgroup_type_index,value,timestamp_us,subgroup_index,shard_index,as_trigger); +} + +template +template +derecho::rpc::QueryResults ServiceClient::put_by_time( + const typename SubgroupType::ObjectType& value, + const uint64_t& timestamp_us, + uint32_t subgroup_index, + uint32_t shard_index, + bool as_trigger) { + LOG_SERVICE_CLIENT_TIMESTAMP(TLT_SERVICE_CLIENT_PUT_BY_TIME_START, + (std::is_base_of::value?value.get_message_id():0)); + // Validate timestamp: timestamp_us must be >= (now - delta) + uint64_t now_ns = get_walltime(); + uint64_t timestamp_ns = timestamp_us * 1000ULL; // Convert microseconds to nanoseconds + uint64_t delta_ns = PUT_BY_TIME_DELTA_NS; + + if (timestamp_ns < now_ns - delta_ns) { + // Timestamp is too old, reject the request + throw derecho::derecho_exception("put_by_time: timestamp is too old (older than now - delta)"); + } + if (!is_external_client()) { + std::lock_guard lck(this->group_ptr_mutex); + if (static_cast(group_ptr->template get_my_shard(subgroup_index)) == shard_index) { + // ordered put_by_time as a shard member - use ordered_send with timestamp + auto& subgroup_handle = group_ptr->template get_subgroup(subgroup_index); + return subgroup_handle.template ordered_send_with_timestamp(timestamp_us, value, as_trigger); + } else { + // p2p put - timestamp not supported for p2p, fall back to regular put + node_id_t node_id = pick_member_by_policy(subgroup_index,shard_index,value.get_key_ref()); + try { + auto& subgroup_handle = group_ptr->template get_subgroup(subgroup_index); + return subgroup_handle.template p2p_send(node_id,value,as_trigger); + } catch (derecho::invalid_subgroup_exception& ex) { + auto& subgroup_handle = group_ptr->template get_nonmember_subgroup(subgroup_index); + return subgroup_handle.template p2p_send(node_id,value,as_trigger); + } + } + } else { + // External client - timestamp not supported, fall back to regular put + std::lock_guard lck(this->external_group_ptr_mutex); + auto& caller = external_group_ptr->template get_subgroup_caller(subgroup_index); + node_id_t node_id = pick_member_by_policy(subgroup_index,shard_index,value.get_key_ref()); + return caller.template p2p_send(node_id,value,as_trigger); + } +} + +template +template +derecho::rpc::QueryResults ServiceClient::type_recursive_put_by_time( + uint32_t type_index, + const ObjectType& object, + const uint64_t& timestamp_us, + uint32_t subgroup_index, + uint32_t shard_index, + bool as_trigger) { + if (type_index == 0) { + return this->template put_by_time(object, timestamp_us, subgroup_index, shard_index, as_trigger); + } else { + return this->template type_recursive_put_by_time(type_index-1,object,timestamp_us,subgroup_index,shard_index,as_trigger); + } +} + +template +template +derecho::rpc::QueryResults ServiceClient::type_recursive_put_by_time( + uint32_t type_index, + const ObjectType& object, + const uint64_t& timestamp_us, + uint32_t subgroup_index, + uint32_t shard_index, + bool as_trigger) { + if (type_index == 0) { + return this->template put_by_time(object, timestamp_us, subgroup_index, shard_index, as_trigger); + } else { + throw derecho::derecho_exception(std::string(__PRETTY_FUNCTION__) + ": type index is out of boundary."); + } +} + template template derecho::rpc::QueryResults ServiceClient::remove( diff --git a/include/cascade/service_client.hpp b/include/cascade/service_client.hpp index ce6e015a..98424dec 100644 --- a/include/cascade/service_client.hpp +++ b/include/cascade/service_client.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -519,7 +520,66 @@ class ServiceClient { void collective_trigger_put(const typename SubgroupType::ObjectType& object, uint32_t subgroup_index, std::unordered_map>>& nodes_and_futures); + /** + * "put_by_time" writes an object to a given subgroup/shard with a custom timestamp. + * + * @param[in] object the object to write. + * @param[in] timestamp_us the timestamp in microseconds to use for the message header. + * The request will be rejected if timestamp_us < get_walltime() - Delta. + * @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be + * used to update the state. + */ + template + derecho::rpc::QueryResults put_by_time(const ObjectType& object, const uint64_t& timestamp_us, bool as_trigger = false); + /** + * "put_by_time" writes an object to a given subgroup/shard with a custom timestamp. + * + * @param[in] object the object to write. + * @param[in] timestamp_us the timestamp in microseconds to use for the message header. + * The request will be rejected if timestamp_us < get_walltime() - Delta. + * @param[in] subgroup_index the subgroup index of CascadeType + * @param[in] shard_index the shard index. + * @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be + * used to update the state. + */ + template + derecho::rpc::QueryResults put_by_time(const typename SubgroupType::ObjectType& object, + const uint64_t& timestamp_us, uint32_t subgroup_index, uint32_t shard_index, bool as_trigger = false); + +protected: + /** + * "type_recursive_put" is a helper function for internal use only. + * @param[in] type_index the index of the subgroup type in the CascadeTypes... list. And the FirstType, + * SecondType, ..., RestTypes should be in the same order. + * @param[in] object the object to write + * @param[in] timestamp_us the timestamp in microseconds to use for the message header. + * @param[in] subgroup_index + * the subgroup index in the subgroup type designated by type_index + * @param[in] shard_index the shard index + * @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be + * used to update the state. + * + * @return a future to the version and timestamp of the put operation. + */ + template + derecho::rpc::QueryResults type_recursive_put_by_time( + uint32_t type_index, + const ObjectType& object, + const uint64_t& timestamp_us, + uint32_t subgroup_index, + uint32_t shard_index, + bool as_trigger = false); + + template + derecho::rpc::QueryResults type_recursive_put_by_time( + uint32_t type_index, + const ObjectType& object, + const uint64_t& timestamp_us, + uint32_t subgroup_index, + uint32_t shard_index, + bool as_trigger = false); +public: /** * "remove" deletes an object with the given key. * diff --git a/include/cascade/utils.hpp b/include/cascade/utils.hpp index a39f70a2..9503c042 100644 --- a/include/cascade/utils.hpp +++ b/include/cascade/utils.hpp @@ -35,6 +35,8 @@ inline uint64_t get_time_us(bool use_wall_clock = true) { return get_time_ns(use_wall_clock)/INT64_1E3; } +// 1 second in nanoseconds - used for timestamp validation in put_by_time +const uint64_t PUT_BY_TIME_DELTA_NS = 1000000000ULL; /** * decompose the prefix into tokens. Please note that the token after the last separator is not considered a part of @@ -204,6 +206,7 @@ class OpenLoopLatencyCollector: public OpenLoopLatencyCollectorClient { #define TLT_SERVICE_CLIENT_MULTI_LIST_KEYS_START (1009) #define TLT_SERVICE_CLIENT_GET_SIZE_START (1010) #define TLT_SERVICE_CLIENT_MULTI_GET_SIZE_START (1011) +#define TLT_SERVICE_CLIENT_PUT_BY_TIME_START (1012) /* For VolatileCascadeStore: * ::put(): From b93eab7e9153b2dcb9c1cd1df20f2fbd9d109200 Mon Sep 17 00:00:00 2001 From: aliciayuting Date: Tue, 13 Jan 2026 19:01:53 -0500 Subject: [PATCH 2/5] get_by_time implementation and test cases --- .../cascade/detail/service_client_impl.hpp | 10 +- include/cascade/service_client.hpp | 2 + include/cascade/utils.hpp | 3 - .../CMakeLists.txt | 53 +++ .../consistency_test_cfg/dfgs.json.tmp | 14 + .../consistency_test_cfg/layout.json.tmp | 50 +++ .../consistency_test_cfg/n0/derecho.cfg | 197 +++++++++++ .../consistency_test_cfg/n1/derecho.cfg | 199 ++++++++++++ .../consistency_test_cfg/n2/derecho.cfg | 199 ++++++++++++ .../consistency_test_cfg/n3/derecho.cfg | 199 ++++++++++++ .../consistency_test_cfg/udl_dlls.cfg.tmp | 1 + .../test_get_by_time.cpp | 307 ++++++++++++++++++ .../test_put_by_time.cpp | 210 ++++++++++++ 13 files changed, 1437 insertions(+), 7 deletions(-) create mode 100644 src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/dfgs.json.tmp create mode 100644 src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/layout.json.tmp create mode 100644 src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n0/derecho.cfg create mode 100644 src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n1/derecho.cfg create mode 100644 src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n2/derecho.cfg create mode 100644 src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n3/derecho.cfg create mode 100644 src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/udl_dlls.cfg.tmp create mode 100644 src/applications/tests/cascade_as_subgroup_classes/test_get_by_time.cpp create mode 100644 src/applications/tests/cascade_as_subgroup_classes/test_put_by_time.cpp diff --git a/include/cascade/detail/service_client_impl.hpp b/include/cascade/detail/service_client_impl.hpp index 233075da..d2357a3d 100644 --- a/include/cascade/detail/service_client_impl.hpp +++ b/include/cascade/detail/service_client_impl.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -64,7 +65,8 @@ std::unique_ptr client_stub_factory() { template ServiceClient::ServiceClient(derecho::Group, CascadeTypes...>* _group_ptr) : external_group_ptr(nullptr), - group_ptr(_group_ptr) { + group_ptr(_group_ptr), + server_clock_skew_delta_us(derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US)) { if(group_ptr == nullptr) { this->external_group_ptr = std::make_unique, CascadeTypes...>>( client_stub_factory>, @@ -769,7 +771,7 @@ derecho::rpc::QueryResults ServiceClient::put_by // STEP 2 - validate timestamp: reject if timestamp_us < get_walltime() - Delta uint64_t now_us = get_walltime() / 1000ULL; // Convert nanoseconds to microseconds - uint64_t delta_us = PUT_BY_TIME_DELTA_NS / 1000ULL; // Convert nanoseconds to microseconds + uint64_t delta_us = server_clock_skew_delta_us / 1000ULL; // Convert nanoseconds to microseconds if (timestamp_us < (now_us - delta_us)) { dbg_default_warn("put_by_time: timestamp {}us is too old (now={}us, delta={}us), rejecting", @@ -798,8 +800,8 @@ derecho::rpc::QueryResults ServiceClient::put_by // Validate timestamp: timestamp_us must be >= (now - delta) uint64_t now_ns = get_walltime(); uint64_t timestamp_ns = timestamp_us * 1000ULL; // Convert microseconds to nanoseconds - uint64_t delta_ns = PUT_BY_TIME_DELTA_NS; - + uint64_t delta_ns = server_clock_skew_delta_us * 1000ULL; + if (timestamp_ns < now_ns - delta_ns) { // Timestamp is too old, reject the request throw derecho::derecho_exception("put_by_time: timestamp is too old (older than now - delta)"); diff --git a/include/cascade/service_client.hpp b/include/cascade/service_client.hpp index 98424dec..8da56664 100644 --- a/include/cascade/service_client.hpp +++ b/include/cascade/service_client.hpp @@ -119,6 +119,8 @@ class ServiceClient { do_hash>> member_cache; mutable std::shared_mutex member_cache_mutex; + // config clock skew delta (in microseconds) for time consistency + const uint64_t server_clock_skew_delta_us; /** * 'object_pool_info_cache' is a local cache for object pool metadata. This cache is used to accelerate the * object access process. If an object pool does not exists, it will be loaded from metadata service. diff --git a/include/cascade/utils.hpp b/include/cascade/utils.hpp index 9503c042..8f2a5c1c 100644 --- a/include/cascade/utils.hpp +++ b/include/cascade/utils.hpp @@ -35,9 +35,6 @@ inline uint64_t get_time_us(bool use_wall_clock = true) { return get_time_ns(use_wall_clock)/INT64_1E3; } -// 1 second in nanoseconds - used for timestamp validation in put_by_time -const uint64_t PUT_BY_TIME_DELTA_NS = 1000000000ULL; - /** * decompose the prefix into tokens. Please note that the token after the last separator is not considered a part of * the prefix and hence dropped if the "prefix_only" is true diff --git a/src/applications/tests/cascade_as_subgroup_classes/CMakeLists.txt b/src/applications/tests/cascade_as_subgroup_classes/CMakeLists.txt index 70004723..358cb547 100644 --- a/src/applications/tests/cascade_as_subgroup_classes/CMakeLists.txt +++ b/src/applications/tests/cascade_as_subgroup_classes/CMakeLists.txt @@ -23,3 +23,56 @@ add_custom_command(TARGET cli_example POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/perf_shutdown.py ${CMAKE_CURRENT_BINARY_DIR}/perf_shutdown.py ) + +# Add test_get_by_time executable +add_executable(test_get_by_time test_get_by_time.cpp) +target_include_directories(test_get_by_time PRIVATE + $ + $ + $ + $ +) +target_link_libraries(test_get_by_time cascade pthread) + +# Test for put_by_time API +add_executable(test_put_by_time test_put_by_time.cpp) +target_include_directories(test_put_by_time PRIVATE + $ + $ + $ +) +target_link_libraries(test_put_by_time cascade) + +add_custom_command(TARGET test_put_by_time POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_directory ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n0/layout.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n1/layout.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n2/layout.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n3/layout.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n0/dfgs.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n1/dfgs.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n2/dfgs.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n3/dfgs.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n0/udl_dlls.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n1/udl_dlls.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n2/udl_dlls.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n3/udl_dlls.cfg + DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg/run.sh.tmp + ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + COMMENT "prepare test_put_by_time configuration" +) diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/dfgs.json.tmp b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/dfgs.json.tmp new file mode 100644 index 00000000..9889f609 --- /dev/null +++ b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/dfgs.json.tmp @@ -0,0 +1,14 @@ +[ + { + "id": "4a2ec3ae-9cd6-11eb-90e5-0242ac110002", + "desc": "console_printer DFG", + "graph": [ + { + "pathname": "/console_printer", + "shard_dispatcher_list": ["one"], + "user_defined_logic_list": ["48e60f7c-8500-11eb-8755-0242ac110002"], + "destinations": [{}] + } + ] + } +] diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/layout.json.tmp b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/layout.json.tmp new file mode 100644 index 00000000..7bdb1016 --- /dev/null +++ b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/layout.json.tmp @@ -0,0 +1,50 @@ +[ + { + "type_alias": "CascadeMetadataService", + "layout": [ + { + "min_nodes_by_shard": ["1"], + "max_nodes_by_shard": ["1"], + "delivery_modes_by_shard": ["Ordered"], + "reserved_node_ids_by_shard": [["0"]], + "profiles_by_shard": ["DEFAULT"] + } + ] + }, + { + "type_alias": "VolatileCascadeStoreWithStringKey", + "layout": [ + { + "min_nodes_by_shard": ["2"], + "max_nodes_by_shard": ["2"], + "delivery_modes_by_shard": ["Ordered"], + "reserved_node_ids_by_shard": [["1","2"]], + "profiles_by_shard": ["DEFAULT"] + } + ] + }, + { + "type_alias": "PersistentCascadeStoreWithStringKey", + "layout": [ + { + "min_nodes_by_shard": ["2"], + "max_nodes_by_shard": ["2"], + "delivery_modes_by_shard": ["Ordered"], + "reserved_node_ids_by_shard": [["1","2"]], + "profiles_by_shard": ["DEFAULT"] + } + ] + }, + { + "type_alias": "TriggerCascadeNoStoreWithStringKey", + "layout": [ + { + "min_nodes_by_shard": ["2"], + "max_nodes_by_shard": ["2"], + "delivery_modes_by_shard": ["Raw"], + "reserved_node_ids_by_shard": [["1","2"]], + "profiles_by_shard": ["DEFAULT"] + } + ] + } +]' diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n0/derecho.cfg b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n0/derecho.cfg new file mode 100644 index 00000000..b4cc2ab4 --- /dev/null +++ b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n0/derecho.cfg @@ -0,0 +1,197 @@ +[DERECHO] +# leader ip - the leader's ip address +contact_ip = 127.0.0.1 +# leader gms port - the leader's gms port +contact_port = 23580 +# my local id - each node should have a different id +local_id = 0 +# my local ip address +local_ip = 127.0.0.1 +# derecho gms port +gms_port = 23580 +# derecho rpc port +state_transfer_port = 28366 +# sst tcp port +sst_port = 37683 +# rdmc tcp port +rdmc_port = 31675 +# external port +external_port = 32645 +# this is the frequency of the failure detector thread. +# It is best to leave this to 1 ms for RDMA. If it is too high, +# you run the risk of overflowing the queue of outstanding sends. +heartbeat_ms = 100 +# sst poll completion queue timeout in millisecond +sst_poll_cq_timeout_ms = 100 +# disable partitioning safety +# By disabling this feature, the derecho is allowed to run when active +# members cannot form a majority. Please be aware of the 'split-brain' +# syndrome:https://en.wikipedia.org/wiki/Split-brain and make sure your +# application is fine with it. +# To help the user play with derecho at beginning, we disabled the +# partitioning safety. We suggest to set it to false for serious deployment +disable_partitioning_safety = false + +# maximum payload size for P2P requests +max_p2p_request_payload_size = 1048576 +# maximum payload size for P2P replies +max_p2p_reply_payload_size = 10240 +# window size for P2P requests and replies +p2p_window_size = 4 + +# Subgroup configurations +# - The default subgroup settings +[SUBGROUP/DEFAULT] +# maximum payload size +# Any message with size large than this has to be broken +# down to multiple messages. +# Large message consumes memory space because the memory buffers +# have to be pre-allocated. +max_payload_size = 1048576 +# maximum reply payload size +# This is for replies generated by ordered sends in the subgroup +max_reply_payload_size = 10240 +# maximum smc (SST's small message multicast) payload size +# If the message size is smaller or equal to this size, +# it will be sent using SST multicast, otherwise it will +# try RDMC if the message size is smaller than max_payload_size. +max_smc_payload_size = 10240 +# block size depends on your max_payload_size. +# It is only relevant if you are ever going to send a message using RDMC. +# In that case, it should be set to the same value as the max_payload_size, +# if the max_payload_size is around 1 MB. For very large messages, the block # size should be a few MBs (1 is fine). +block_size = 1048576 +# message window size +# the length of the message pipeline +window_size = 16 +# the send algorithm for RDMC. Other options are +# chain_send, sequential_send, tree_send +rdmc_send_algorithm = binomial_send +# - SAMPLE for large message settings +[SUBGROUP/LARGE] +max_payload_size = 102400 +max_reply_payload_size = 102400 +max_smc_payload_size = 10240 +block_size = 10240 +window_size = 3 +rdmc_send_algorithm = binomial_send +# - SAMPLE for small message settings +[SUBGROUP/SMALL] +max_payload_size = 100 +max_reply_payload_size = 100 +max_smc_payload_size = 100 +# does not matter unless max_payload_size > max_smc_payload_size +block_size = 0 +window_size = 50 +rdmc_send_algorithm = binomial_send + +[SUBGROUP/VCS] +max_payload_size = 1048576 +max_reply_payload_size = 8192 +max_smc_payload_size = 10240 +# does not matter unless max_payload_size > max_smc_payload_size +block_size = 1048576 +window_size = 8 +rdmc_send_algorithm = binomial_send +num_shards = 1 +min_nodes = 1 +max_nodes = 4 + +[SUBGROUP/PCS] +max_payload_size = 8192 +max_reply_payload_size = 8192 +max_smc_payload_size = 10240 +# does not matter unless max_payload_size > max_smc_payload_size +block_size = 1048576 +window_size = 50 +rdmc_send_algorithm = binomial_send +num_shards = 1 +min_nodes = 1 +max_nodes = 4 + +# RDMA section contains configurations of the following +# - which RDMA device to use +# - device configurations +[RDMA] +# 1. provider = bgq|gni|mlx|netdir|psm|psm2|rxd|rxm|shm|tcp|udp|usnic|verbs +# possible options(only 'tcp' and 'verbs' providers are tested so far): +# bgq - The Blue Gene/Q Fabric Provider +# gni - The GNI Fabric Provider (Cray XC (TM) systems) +# mlx - The MLX Fabric Provider (UCX library) +# netdir - The Network Direct Fabric Provider (Microsoft Network Direct SPI) +# psm - The PSM Fabric Provider +# psm2 - The PSM2 Fabric Provider +# rxd - The RxD (RDM over DGRAM) Utility Provider +# rxm - The RxM (RDM over MSG) Utility Provider +# shm - The SHM Fabric Provider +# tcp - The Sockets Fabric Provider (TCP) +# udp - The UDP Fabric Provider +# usnic - The usNIC Fabric Provider (Cisco VIC) +# verbs - The Verbs Fabric Provider +provider = tcp + +# 2. domain +# For tcp provider, domain is the NIC name (ifconfig | grep -v -e "^ ") +# For verbs provider, domain is the device name (ibv_devices) +domain = lo + +# 3. tx_depth +# tx_depth applies to hints->tx_attr->size, where hint is a struct fi_info object. +# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html +tx_depth = 256 + +# 4. rx_depth: +# rx_depth applies to hints->rx_attr->size, where hint is a struct fi_info object. +# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html +rx_depth = 256 + +# Persistent configurations +[PERS] +# persistent directory for file system-based logfile. +file_path = .plog +ramdisk_path = /dev/shm/volatile_t +# Reset persistent data +# CAUTION: "reset = true" removes existing persisted data!!! +reset = false +# Max number of the log entries in each persistent, default to 1048576 +max_log_entry = 1048576 +# Max data size in bytes for each persistent, default to 512GB +max_data_size = 549755813888 +# Server clock skew delta in microseconds for put_by_time timestamp validation +server_clock_skew_delta_us = 1000000 +# Temporal consistency delta +temporal_consistency_delta_us = 1000000 + + +# Logger configurations +[LOGGER] +# default log name +default_log_name = derecho_debug +# default log level +# Available options: +# trace,debug,info,warn,error,critical,off +default_log_level = off + +[LAYOUT] +json_layout_file = 'layout.json' + +# cascade service configuration +# TODO: add document for how to setup a cascade service. +[CASCADE] +cpu_cores = 0-3 +num_stateless_workers_for_multicast_ocdp = 2 +num_stateless_workers_for_p2p_ocdp = 2 +num_stateful_workers_for_multicast_ocdp = 2 +num_stateful_workers_for_p2p_ocdp = 2 +worker_cpu_affinity = ' +{ + "multicast_ocdp" : { + "0":"0", + "1":"1" + }, + "p2p_ocdp" : { + "0":"2", + "1":"3" + } +} +' diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n1/derecho.cfg b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n1/derecho.cfg new file mode 100644 index 00000000..a83c9e05 --- /dev/null +++ b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n1/derecho.cfg @@ -0,0 +1,199 @@ +[DERECHO] +# leader ip - the leader's ip address +contact_ip = 127.0.0.1 +# leader gms port - the leader's gms port +contact_port = 23580 +# my local id - each node should have a different id +local_id = 1 +# my local ip address +local_ip = 127.0.0.1 +# derecho gms port +gms_port = 23581 +# derecho rpc port +state_transfer_port = 28367 +# sst tcp port +sst_port = 37684 +# rdmc tcp port +rdmc_port = 31674 +# external port +external_port = 32646 +# this is the frequency of the failure detector thread. +# It is best to leave this to 1 ms for RDMA. If it is too high, +# you run the risk of overflowing the queue of outstanding sends. +heartbeat_ms = 100 +# sst poll completion queue timeout in millisecond +sst_poll_cq_timeout_ms = 100 +# disable partitioning safety +# By disabling this feature, the derecho is allowed to run when active +# members cannot form a majority. Please be aware of the 'split-brain' +# syndrome:https://en.wikipedia.org/wiki/Split-brain and make sure your +# application is fine with it. +# To help the user play with derecho at beginning, we disabled the +# partitioning safety. We suggest to set it to false for serious deployment +disable_partitioning_safety = false + +# maximum payload size for P2P requests +max_p2p_request_payload_size = 1048576 +# maximum payload size for P2P replies +max_p2p_reply_payload_size = 10240 +# window size for P2P requests and replies +p2p_window_size = 4 + +# Subgroup configurations +# - The default subgroup settings +[SUBGROUP/DEFAULT] +# maximum payload size +# Any message with size large than this has to be broken +# down to multiple messages. +# Large message consumes memory space because the memory buffers +# have to be pre-allocated. +max_payload_size = 1048576 +# maximum reply payload size +# This is for replies generated by ordered sends in the subgroup +max_reply_payload_size = 10240 +# maximum smc (SST's small message multicast) payload size +# If the message size is smaller or equal to this size, +# it will be sent using SST multicast, otherwise it will +# try RDMC if the message size is smaller than max_payload_size. +max_smc_payload_size = 10240 +# block size depends on your max_payload_size. +# It is only relevant if you are ever going to send a message using RDMC. +# In that case, it should be set to the same value as the max_payload_size, +# if the max_payload_size is around 1 MB. For very large messages, the block # size should be a few MBs (1 is fine). +block_size = 1048576 +# message window size +# the length of the message pipeline +window_size = 16 +# the send algorithm for RDMC. Other options are +# chain_send, sequential_send, tree_send +rdmc_send_algorithm = binomial_send +# - SAMPLE for large message settings +[SUBGROUP/LARGE] +max_payload_size = 102400 +max_reply_payload_size = 102400 +max_smc_payload_size = 10240 +block_size = 10240 +window_size = 3 +rdmc_send_algorithm = binomial_send +# - SAMPLE for small message settings +[SUBGROUP/SMALL] +max_payload_size = 100 +max_reply_payload_size = 100 +max_smc_payload_size = 100 +# does not matter unless max_payload_size > max_smc_payload_size +block_size = 0 +window_size = 50 +rdmc_send_algorithm = binomial_send + +[SUBGROUP/VCS] +max_payload_size = 1048576 +max_reply_payload_size = 8192 +max_smc_payload_size = 10240 +# does not matter unless max_payload_size > max_smc_payload_size +block_size = 1048576 +window_size = 8 +rdmc_send_algorithm = binomial_send +num_shards = 1 +min_nodes = 1 +max_nodes = 4 + +[SUBGROUP/PCS] +max_payload_size = 8192 +max_reply_payload_size = 8192 +max_smc_payload_size = 10240 +# does not matter unless max_payload_size > max_smc_payload_size +block_size = 1048576 +window_size = 50 +rdmc_send_algorithm = binomial_send +num_shards = 1 +min_nodes = 1 +max_nodes = 4 + +# RDMA section contains configurations of the following +# - which RDMA device to use +# - device configurations +[RDMA] +# 1. provider = bgq|gni|mlx|netdir|psm|psm2|rxd|rxm|shm|tcp|udp|usnic|verbs +# possible options(only 'tcp' and 'verbs' providers are tested so far): +# bgq - The Blue Gene/Q Fabric Provider +# gni - The GNI Fabric Provider (Cray XC (TM) systems) +# mlx - The MLX Fabric Provider (UCX library) +# netdir - The Network Direct Fabric Provider (Microsoft Network Direct SPI) +# psm - The PSM Fabric Provider +# psm2 - The PSM2 Fabric Provider +# rxd - The RxD (RDM over DGRAM) Utility Provider +# rxm - The RxM (RDM over MSG) Utility Provider +# shm - The SHM Fabric Provider +# tcp - The Sockets Fabric Provider (TCP) +# udp - The UDP Fabric Provider +# usnic - The usNIC Fabric Provider (Cisco VIC) +# verbs - The Verbs Fabric Provider +provider = tcp + +# 2. domain +# For tcp provider, domain is the NIC name (ifconfig | grep -v -e "^ ") +# For verbs provider, domain is the device name (ibv_devices) +domain = lo + +# 3. tx_depth +# tx_depth applies to hints->tx_attr->size, where hint is a struct fi_info object. +# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html +tx_depth = 256 + +# 4. rx_depth: +# rx_depth applies to hints->rx_attr->size, where hint is a struct fi_info object. +# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html +rx_depth = 256 + +# Persistent configurations +[PERS] +# persistent directory for file system-based logfile. +file_path = .plog +ramdisk_path = /dev/shm/volatile_t +# Reset persistent data +# CAUTION: "reset = true" removes existing persisted data!!! +reset = false +# Max number of the log entries in each persistent, default to 1048576 +max_log_entry = 1048576 +# Max data size in bytes for each persistent, default to 512GB +max_data_size = 549755813888 +# Server clock skew delta in microseconds for put_by_time timestamp validation +server_clock_skew_delta_us = 1000000 +# Temporal consistency delta +temporal_consistency_delta_us = 1000000 + + +# Logger configurations +[LOGGER] +# default log name +default_log_name = derecho_debug +# default log level +# Available options: +# trace,debug,info,warn,error,critical,off +default_log_level = off + + +[LAYOUT] +json_layout_file = 'layout.json' + +# cascade service configuration +# TODO: add document for how to setup a cascade service. +[CASCADE] +cpu_cores = 0-3 +num_stateless_workers_for_multicast_ocdp = 2 +num_stateless_workers_for_p2p_ocdp = 2 +num_stateful_workers_for_multicast_ocdp = 2 +num_stateful_workers_for_p2p_ocdp = 2 +worker_cpu_affinity = ' +{ + "multicast_ocdp" : { + "0":"0", + "1":"1" + }, + "p2p_ocdp" : { + "0":"2", + "1":"3" + } +} +' + diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n2/derecho.cfg b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n2/derecho.cfg new file mode 100644 index 00000000..5609422d --- /dev/null +++ b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n2/derecho.cfg @@ -0,0 +1,199 @@ +[DERECHO] +# leader ip - the leader's ip address +contact_ip = 127.0.0.1 +# leader gms port - the leader's gms port +contact_port = 23580 +# my local id - each node should have a different id +local_id = 2 +# my local ip address +local_ip = 127.0.0.1 +# derecho gms port +gms_port = 23582 +# derecho rpc port +state_transfer_port = 28368 +# sst tcp port +sst_port = 37685 +# rdmc tcp port +rdmc_port = 31677 +# external port +external_port = 32647 +# this is the frequency of the failure detector thread. +# It is best to leave this to 1 ms for RDMA. If it is too high, +# you run the risk of overflowing the queue of outstanding sends. +heartbeat_ms = 100 +# sst poll completion queue timeout in millisecond +sst_poll_cq_timeout_ms = 100 +# disable partitioning safety +# By disabling this feature, the derecho is allowed to run when active +# members cannot form a majority. Please be aware of the 'split-brain' +# syndrome:https://en.wikipedia.org/wiki/Split-brain and make sure your +# application is fine with it. +# To help the user play with derecho at beginning, we disabled the +# partitioning safety. We suggest to set it to false for serious deployment +disable_partitioning_safety = false + +# maximum payload size for P2P requests +max_p2p_request_payload_size = 1048576 +# maximum payload size for P2P replies +max_p2p_reply_payload_size = 10240 +# window size for P2P requests and replies +p2p_window_size = 4 + +# Subgroup configurations +# - The default subgroup settings +[SUBGROUP/DEFAULT] +# maximum payload size +# Any message with size large than this has to be broken +# down to multiple messages. +# Large message consumes memory space because the memory buffers +# have to be pre-allocated. +max_payload_size = 1048576 +# maximum reply payload size +# This is for replies generated by ordered sends in the subgroup +max_reply_payload_size = 10240 +# maximum smc (SST's small message multicast) payload size +# If the message size is smaller or equal to this size, +# it will be sent using SST multicast, otherwise it will +# try RDMC if the message size is smaller than max_payload_size. +max_smc_payload_size = 10240 +# block size depends on your max_payload_size. +# It is only relevant if you are ever going to send a message using RDMC. +# In that case, it should be set to the same value as the max_payload_size, +# if the max_payload_size is around 1 MB. For very large messages, the block # size should be a few MBs (1 is fine). +block_size = 1048576 +# message window size +# the length of the message pipeline +window_size = 16 +# the send algorithm for RDMC. Other options are +# chain_send, sequential_send, tree_send +rdmc_send_algorithm = binomial_send +# - SAMPLE for large message settings +[SUBGROUP/LARGE] +max_payload_size = 102400 +max_reply_payload_size = 102400 +max_smc_payload_size = 10240 +block_size = 10240 +window_size = 3 +rdmc_send_algorithm = binomial_send +# - SAMPLE for small message settings +[SUBGROUP/SMALL] +max_payload_size = 100 +max_reply_payload_size = 100 +max_smc_payload_size = 100 +# does not matter unless max_payload_size > max_smc_payload_size +block_size = 0 +window_size = 50 +rdmc_send_algorithm = binomial_send + +[SUBGROUP/VCS] +max_payload_size = 1048576 +max_reply_payload_size = 8192 +max_smc_payload_size = 10240 +# does not matter unless max_payload_size > max_smc_payload_size +block_size = 1048576 +window_size = 8 +rdmc_send_algorithm = binomial_send +num_shards = 1 +min_nodes = 1 +max_nodes = 4 + +[SUBGROUP/PCS] +max_payload_size = 8192 +max_reply_payload_size = 8192 +max_smc_payload_size = 10240 +# does not matter unless max_payload_size > max_smc_payload_size +block_size = 1048576 +window_size = 50 +rdmc_send_algorithm = binomial_send +num_shards = 1 +min_nodes = 1 +max_nodes = 4 + +# RDMA section contains configurations of the following +# - which RDMA device to use +# - device configurations +[RDMA] +# 1. provider = bgq|gni|mlx|netdir|psm|psm2|rxd|rxm|shm|tcp|udp|usnic|verbs +# possible options(only 'tcp' and 'verbs' providers are tested so far): +# bgq - The Blue Gene/Q Fabric Provider +# gni - The GNI Fabric Provider (Cray XC (TM) systems) +# mlx - The MLX Fabric Provider (UCX library) +# netdir - The Network Direct Fabric Provider (Microsoft Network Direct SPI) +# psm - The PSM Fabric Provider +# psm2 - The PSM2 Fabric Provider +# rxd - The RxD (RDM over DGRAM) Utility Provider +# rxm - The RxM (RDM over MSG) Utility Provider +# shm - The SHM Fabric Provider +# tcp - The Sockets Fabric Provider (TCP) +# udp - The UDP Fabric Provider +# usnic - The usNIC Fabric Provider (Cisco VIC) +# verbs - The Verbs Fabric Provider +provider = tcp + +# 2. domain +# For tcp provider, domain is the NIC name (ifconfig | grep -v -e "^ ") +# For verbs provider, domain is the device name (ibv_devices) +domain = lo + +# 3. tx_depth +# tx_depth applies to hints->tx_attr->size, where hint is a struct fi_info object. +# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html +tx_depth = 256 + +# 4. rx_depth: +# rx_depth applies to hints->rx_attr->size, where hint is a struct fi_info object. +# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html +rx_depth = 256 + +# Persistent configurations +[PERS] +# persistent directory for file system-based logfile. +file_path = .plog +ramdisk_path = /dev/shm/volatile_t +# Reset persistent data +# CAUTION: "reset = true" removes existing persisted data!!! +reset = false +# Max number of the log entries in each persistent, default to 1048576 +max_log_entry = 1048576 +# Max data size in bytes for each persistent, default to 512GB +max_data_size = 549755813888 +# Server clock skew delta in microseconds for put_by_time timestamp validation +server_clock_skew_delta_us = 1000000 +# Temporal consistency delta +temporal_consistency_delta_us = 1000000 + + +# Logger configurations +[LOGGER] +# default log name +default_log_name = derecho_debug +# default log level +# Available options: +# trace,debug,info,warn,error,critical,off +default_log_level = off + + +[LAYOUT] +json_layout_file = 'layout.json' + +# cascade service configuration +# TODO: add document for how to setup a cascade service. +[CASCADE] +cpu_cores = 0-3 +num_stateless_workers_for_multicast_ocdp = 2 +num_stateless_workers_for_p2p_ocdp = 2 +num_stateful_workers_for_multicast_ocdp = 2 +num_stateful_workers_for_p2p_ocdp = 2 +worker_cpu_affinity = ' +{ + "multicast_ocdp" : { + "0":"0", + "1":"1" + }, + "p2p_ocdp" : { + "0":"2", + "1":"3" + } +} +' + diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n3/derecho.cfg b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n3/derecho.cfg new file mode 100644 index 00000000..73430c4f --- /dev/null +++ b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n3/derecho.cfg @@ -0,0 +1,199 @@ +[DERECHO] +# leader ip - the leader's ip address +contact_ip = 127.0.0.1 +# leader gms port - the leader's gms port +contact_port = 23580 +# my local id - each node should have a different id +local_id = 3 +# my local ip address +local_ip = 127.0.0.1 +# derecho gms port +gms_port = 23583 +# derecho rpc port +state_transfer_port = 28369 +# sst tcp port +sst_port = 37686 +# rdmc tcp port +rdmc_port = 31678 +# external port +external_port = 32648 +# this is the frequency of the failure detector thread. +# It is best to leave this to 1 ms for RDMA. If it is too high, +# you run the risk of overflowing the queue of outstanding sends. +heartbeat_ms = 100 +# sst poll completion queue timeout in millisecond +sst_poll_cq_timeout_ms = 100 +# disable partitioning safety +# By disabling this feature, the derecho is allowed to run when active +# members cannot form a majority. Please be aware of the 'split-brain' +# syndrome:https://en.wikipedia.org/wiki/Split-brain and make sure your +# application is fine with it. +# To help the user play with derecho at beginning, we disabled the +# partitioning safety. We suggest to set it to false for serious deployment +disable_partitioning_safety = false + +# maximum payload size for P2P requests +max_p2p_request_payload_size = 1048576 +# maximum payload size for P2P replies +max_p2p_reply_payload_size = 10240 +# window size for P2P requests and replies +p2p_window_size = 4 + +# Subgroup configurations +# - The default subgroup settings +[SUBGROUP/DEFAULT] +# maximum payload size +# Any message with size large than this has to be broken +# down to multiple messages. +# Large message consumes memory space because the memory buffers +# have to be pre-allocated. +max_payload_size = 1048576 +# maximum reply payload size +# This is for replies generated by ordered sends in the subgroup +max_reply_payload_size = 10240 +# maximum smc (SST's small message multicast) payload size +# If the message size is smaller or equal to this size, +# it will be sent using SST multicast, otherwise it will +# try RDMC if the message size is smaller than max_payload_size. +max_smc_payload_size = 10240 +# block size depends on your max_payload_size. +# It is only relevant if you are ever going to send a message using RDMC. +# In that case, it should be set to the same value as the max_payload_size, +# if the max_payload_size is around 1 MB. For very large messages, the block # size should be a few MBs (1 is fine). +block_size = 1048576 +# message window size +# the length of the message pipeline +window_size = 16 +# the send algorithm for RDMC. Other options are +# chain_send, sequential_send, tree_send +rdmc_send_algorithm = binomial_send +# - SAMPLE for large message settings +[SUBGROUP/LARGE] +max_payload_size = 102400 +max_reply_payload_size = 102400 +max_smc_payload_size = 10240 +block_size = 10240 +window_size = 3 +rdmc_send_algorithm = binomial_send +# - SAMPLE for small message settings +[SUBGROUP/SMALL] +max_payload_size = 100 +max_reply_payload_size = 100 +max_smc_payload_size = 100 +# does not matter unless max_payload_size > max_smc_payload_size +block_size = 0 +window_size = 50 +rdmc_send_algorithm = binomial_send + +[SUBGROUP/VCS] +max_payload_size = 1048576 +max_reply_payload_size = 8192 +max_smc_payload_size = 10240 +# does not matter unless max_payload_size > max_smc_payload_size +block_size = 1048576 +window_size = 8 +rdmc_send_algorithm = binomial_send +num_shards = 1 +min_nodes = 1 +max_nodes = 4 + +[SUBGROUP/PCS] +max_payload_size = 8192 +max_reply_payload_size = 8192 +max_smc_payload_size = 10240 +# does not matter unless max_payload_size > max_smc_payload_size +block_size = 1048576 +window_size = 50 +rdmc_send_algorithm = binomial_send +num_shards = 1 +min_nodes = 1 +max_nodes = 4 + +# RDMA section contains configurations of the following +# - which RDMA device to use +# - device configurations +[RDMA] +# 1. provider = bgq|gni|mlx|netdir|psm|psm2|rxd|rxm|shm|tcp|udp|usnic|verbs +# possible options(only 'tcp' and 'verbs' providers are tested so far): +# bgq - The Blue Gene/Q Fabric Provider +# gni - The GNI Fabric Provider (Cray XC (TM) systems) +# mlx - The MLX Fabric Provider (UCX library) +# netdir - The Network Direct Fabric Provider (Microsoft Network Direct SPI) +# psm - The PSM Fabric Provider +# psm2 - The PSM2 Fabric Provider +# rxd - The RxD (RDM over DGRAM) Utility Provider +# rxm - The RxM (RDM over MSG) Utility Provider +# shm - The SHM Fabric Provider +# tcp - The Sockets Fabric Provider (TCP) +# udp - The UDP Fabric Provider +# usnic - The usNIC Fabric Provider (Cisco VIC) +# verbs - The Verbs Fabric Provider +provider = tcp + +# 2. domain +# For tcp provider, domain is the NIC name (ifconfig | grep -v -e "^ ") +# For verbs provider, domain is the device name (ibv_devices) +domain = lo + +# 3. tx_depth +# tx_depth applies to hints->tx_attr->size, where hint is a struct fi_info object. +# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html +tx_depth = 256 + +# 4. rx_depth: +# rx_depth applies to hints->rx_attr->size, where hint is a struct fi_info object. +# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html +rx_depth = 256 + +# Persistent configurations +[PERS] +# persistent directory for file system-based logfile. +file_path = .plog +ramdisk_path = /dev/shm/volatile_t +# Reset persistent data +# CAUTION: "reset = true" removes existing persisted data!!! +reset = false +# Max number of the log entries in each persistent, default to 1048576 +max_log_entry = 1048576 +# Max data size in bytes for each persistent, default to 512GB +max_data_size = 549755813888 +# Server clock skew delta in microseconds for put_by_time timestamp validation +server_clock_skew_delta_us = 1000000 +# Temporal consistency delta +temporal_consistency_delta_us = 1000000 + + +# Logger configurations +[LOGGER] +# default log name +default_log_name = derecho_debug +# default log level +# Available options: +# trace,debug,info,warn,error,critical,off +default_log_level = off + + +[LAYOUT] +json_layout_file = 'layout.json' + +# cascade service configuration +# TODO: add document for how to setup a cascade service. +[CASCADE] +cpu_cores = 0-3 +num_stateless_workers_for_multicast_ocdp = 2 +num_stateless_workers_for_p2p_ocdp = 2 +num_stateful_workers_for_multicast_ocdp = 2 +num_stateful_workers_for_p2p_ocdp = 2 +worker_cpu_affinity = ' +{ + "multicast_ocdp" : { + "0":"0", + "1":"1" + }, + "p2p_ocdp" : { + "0":"2", + "1":"3" + } +} +' + diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/udl_dlls.cfg.tmp b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/udl_dlls.cfg.tmp new file mode 100644 index 00000000..03ef26d3 --- /dev/null +++ b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/udl_dlls.cfg.tmp @@ -0,0 +1 @@ +../../libconsole_printer_udl.so diff --git a/src/applications/tests/cascade_as_subgroup_classes/test_get_by_time.cpp b/src/applications/tests/cascade_as_subgroup_classes/test_get_by_time.cpp new file mode 100644 index 00000000..bc18080e --- /dev/null +++ b/src/applications/tests/cascade_as_subgroup_classes/test_get_by_time.cpp @@ -0,0 +1,307 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace derecho::cascade; + +/** + * Test client for get_by_time API with temporal consistency checks + * + * This program tests the get_by_time functionality with the new temporal consistency + * implementation that uses PERS_TEMPORAL_CONSISTENCY_DELTA and PERS_SERVER_CLOCK_SKEW_DELTA_US. + * + * Tests: + * 1. get_by_time with too recent timestamp (should return INVALID_VERSION) + * 2. get_by_time with timestamp in middle range (threshold2 <= ts < threshold1) + * 3. get_by_time with old enough timestamp (ts < threshold2) + * 4. Verify closest version selection + * 5. Test with multiple puts at different times + */ +int main(int argc, char** argv) { + try { + // Initialize the Cascade client (singleton) + auto& capi = ServiceClientAPI::get_service_client(); + + std::cout << "=== Testing get_by_time API with Temporal Consistency ===" << std::endl; + std::cout << std::endl; + + // Get configuration values for reference + uint64_t temporal_consistency_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_TEMPORAL_CONSISTENCY_DELTA_US); + uint64_t clock_skew_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US); + + std::cout << "Configuration:" << std::endl; + std::cout << " PERS_TEMPORAL_CONSISTENCY_DELTA: " << temporal_consistency_delta_us << " microseconds" << std::endl; + std::cout << " PERS_SERVER_CLOCK_SKEW_DELTA_US: " << clock_skew_delta_us << " microseconds" << std::endl; + std::cout << std::endl; + + // Use subgroup index 0, shard index 0 for testing + uint32_t subgroup_index = 0; + uint32_t shard_index = 0; + + // First, populate the store with some data at known timestamps + std::cout << "=== Setup: Populating store with test data ===" << std::endl; + std::vector> test_data; // key -> timestamp + + // Put several objects with known timestamps + for (int i = 0; i < 5; i++) { + ObjectWithStringKey obj; + obj.key = "test_key_" + std::to_string(i); + obj.blob = Blob(reinterpret_cast(("test_value_" + std::to_string(i)).c_str()), + ("test_value_" + std::to_string(i)).length()); + obj.previous_version = persistent::INVALID_VERSION; + obj.previous_version_by_key = persistent::INVALID_VERSION; + + // Use current time for each put + uint64_t timestamp_us = get_walltime() / 1000ULL; + std::cout << " Putting key: " << obj.key << " at timestamp: " << timestamp_us << std::endl; + + auto result = capi.template put_by_time( + obj, timestamp_us, subgroup_index, shard_index, false); + + // Wait for result and store the actual timestamp + for (auto& reply_future : result.get()) { + auto reply = reply_future.second.get(); + uint64_t actual_timestamp = std::get<1>(reply); + test_data.push_back({obj.key, actual_timestamp}); + std::cout << " ✓ Put successful. Version: " << std::get<0>(reply) + << ", Timestamp: " << actual_timestamp << std::endl; + } + + // Small delay between puts to ensure different timestamps + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + std::cout << std::endl; + + // Wait a bit to ensure data is stable + std::cout << "Waiting for data to stabilize..." << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::cout << std::endl; + + // Test 1: get_by_time with too recent timestamp (should return INVALID_VERSION) + std::cout << "=== Test 1: get_by_time with too recent timestamp ===" << std::endl; + { + uint64_t now_us = get_walltime() / 1000ULL; + uint64_t threshold1 = now_us - temporal_consistency_delta_us - 2 * clock_skew_delta_us; + uint64_t recent_time = threshold1 - 1000; // Just before threshold1 + + std::cout << " Current time: " << now_us << " microseconds" << std::endl; + std::cout << " Threshold1 (too recent): " << threshold1 << " microseconds" << std::endl; + std::cout << " Requesting time: " << recent_time << " microseconds" << std::endl; + std::cout << " Expected: INVALID_VERSION (time too recent, not stable)" << std::endl; + + auto result = capi.template get_by_time( + test_data[0].first, recent_time, true, subgroup_index, shard_index); + + bool found_valid = false; + for (auto& reply_future : result.get()) { + auto obj = reply_future.second.get(); + if (obj.key == test_data[0].first) { + found_valid = true; + std::cout << " ✗ Unexpected: Found object with key: " << obj.key << std::endl; + } + } + + if (!found_valid) { + std::cout << " ✓ Correctly returned INVALID_VERSION (empty result)" << std::endl; + } + } + std::cout << std::endl; + + // Test 2: get_by_time with timestamp in middle range (threshold2 <= ts < threshold1) + std::cout << "=== Test 2: get_by_time with timestamp in middle range ===" << std::endl; + { + uint64_t now_us = get_walltime() / 1000ULL; + uint64_t threshold1 = now_us - temporal_consistency_delta_us - 2 * clock_skew_delta_us; + uint64_t threshold2 = now_us - temporal_consistency_delta_us - 3 * clock_skew_delta_us; + + // Use a timestamp in the middle range + uint64_t middle_time = (threshold1 + threshold2) / 2; + + std::cout << " Current time: " << now_us << " microseconds" << std::endl; + std::cout << " Threshold1: " << threshold1 << " microseconds" << std::endl; + std::cout << " Threshold2: " << threshold2 << " microseconds" << std::endl; + std::cout << " Requesting time: " << middle_time << " microseconds" << std::endl; + std::cout << " Expected: Closest version <= threshold1" << std::endl; + + // Use the oldest test data key + if (!test_data.empty()) { + auto result = capi.template get_by_time( + test_data[0].first, middle_time, true, subgroup_index, shard_index); + + bool found = false; + for (auto& reply_future : result.get()) { + auto obj = reply_future.second.get(); + std::cout << " ✓ Found object with key: " << obj.key + << ", timestamp: " << obj.timestamp_us << std::endl; + if (obj.timestamp_us <= threshold1) { + std::cout << " ✓ Timestamp is within threshold1" << std::endl; + } else { + std::cout << " ✗ Warning: Timestamp exceeds threshold1" << std::endl; + } + found = true; + } + + if (!found) { + std::cout << " ✗ No object found" << std::endl; + } + } + } + std::cout << std::endl; + + // Test 3: get_by_time with old enough timestamp (ts < threshold2) + std::cout << "=== Test 3: get_by_time with old enough timestamp ===" << std::endl; + { + uint64_t now_us = get_walltime() / 1000ULL; + uint64_t threshold2 = now_us - temporal_consistency_delta_us - 3 * clock_skew_delta_us; + + // Use a timestamp well before threshold2 + uint64_t old_time = threshold2 - clock_skew_delta_us; + + std::cout << " Current time: " << now_us << " microseconds" << std::endl; + std::cout << " Threshold2: " << threshold2 << " microseconds" << std::endl; + std::cout << " Requesting time: " << old_time << " microseconds" << std::endl; + std::cout << " Expected: Closest version <= (requested_time + clock_skew_delta)" << std::endl; + + if (!test_data.empty()) { + auto result = capi.template get_by_time( + test_data[0].first, old_time, true, subgroup_index, shard_index); + + bool found = false; + for (auto& reply_future : result.get()) { + auto obj = reply_future.second.get(); + uint64_t max_allowed = old_time + clock_skew_delta_us; + std::cout << " ✓ Found object with key: " << obj.key + << ", timestamp: " << obj.timestamp_us << std::endl; + std::cout << " Max allowed timestamp: " << max_allowed << std::endl; + if (obj.timestamp_us <= max_allowed) { + std::cout << " ✓ Timestamp is within allowed range" << std::endl; + } else { + std::cout << " ✗ Warning: Timestamp exceeds allowed range" << std::endl; + } + found = true; + } + + if (!found) { + std::cout << " ✗ No object found" << std::endl; + } + } + } + std::cout << std::endl; + + // Test 4: Verify closest version selection with multiple versions + std::cout << "=== Test 4: Verify closest version selection ===" << std::endl; + { + if (test_data.size() >= 3) { + // Get the timestamps of the first and third objects + uint64_t first_timestamp = test_data[0].second; + uint64_t third_timestamp = test_data[2].second; + + // Request a time between first and third + uint64_t target_time = (first_timestamp + third_timestamp) / 2; + + std::cout << " First object timestamp: " << first_timestamp << std::endl; + std::cout << " Third object timestamp: " << third_timestamp << std::endl; + std::cout << " Target time: " << target_time << std::endl; + std::cout << " Expected: Closest version to target_time" << std::endl; + + // Test with the key that has multiple versions + auto result = capi.template get_by_time( + test_data[0].first, target_time, true, subgroup_index, shard_index); + + bool found = false; + for (auto& reply_future : result.get()) { + auto obj = reply_future.second.get(); + uint64_t distance = (obj.timestamp_us > target_time) ? + (obj.timestamp_us - target_time) : + (target_time - obj.timestamp_us); + std::cout << " ✓ Found object with timestamp: " << obj.timestamp_us << std::endl; + std::cout << " Distance from target: " << distance << " microseconds" << std::endl; + found = true; + } + + if (!found) { + std::cout << " ✗ No object found" << std::endl; + } + } else { + std::cout << " Skipped: Need at least 3 test objects" << std::endl; + } + } + std::cout << std::endl; + + // Test 5: get_by_time with exact timestamp match + std::cout << "=== Test 5: get_by_time with exact timestamp match ===" << std::endl; + { + if (!test_data.empty()) { + uint64_t exact_timestamp = test_data[0].second; + + std::cout << " Requesting exact timestamp: " << exact_timestamp << std::endl; + std::cout << " Expected: Object with matching timestamp" << std::endl; + + auto result = capi.template get_by_time( + test_data[0].first, exact_timestamp, true, subgroup_index, shard_index); + + bool found = false; + for (auto& reply_future : result.get()) { + auto obj = reply_future.second.get(); + std::cout << " ✓ Found object with key: " << obj.key + << ", timestamp: " << obj.timestamp_us << std::endl; + if (obj.timestamp_us == exact_timestamp) { + std::cout << " ✓ Exact timestamp match!" << std::endl; + } else { + std::cout << " Note: Returned closest timestamp: " << obj.timestamp_us << std::endl; + } + found = true; + } + + if (!found) { + std::cout << " ✗ No object found" << std::endl; + } + } + } + std::cout << std::endl; + + // Test 6: get_by_time with very old timestamp (should still work if data exists) + std::cout << "=== Test 6: get_by_time with very old timestamp ===" << std::endl; + { + if (!test_data.empty()) { + // Use a timestamp from 10 seconds ago + uint64_t now_us = get_walltime() / 1000ULL; + uint64_t very_old_time = now_us - 10000000ULL; // 10 seconds ago + + std::cout << " Current time: " << now_us << " microseconds" << std::endl; + std::cout << " Requesting very old time: " << very_old_time << " microseconds" << std::endl; + std::cout << " Expected: Closest version if data exists" << std::endl; + + auto result = capi.template get_by_time( + test_data[0].first, very_old_time, true, subgroup_index, shard_index); + + bool found = false; + for (auto& reply_future : result.get()) { + auto obj = reply_future.second.get(); + std::cout << " ✓ Found object with timestamp: " << obj.timestamp_us << std::endl; + found = true; + } + + if (!found) { + std::cout << " Note: No object found (may be expected if timestamp is too old)" << std::endl; + } + } + } + std::cout << std::endl; + + std::cout << "=== All tests completed ===" << std::endl; + return 0; + + } catch (const std::exception& e) { + std::cerr << "Exception: " << e.what() << std::endl; + return -1; + } +} \ No newline at end of file diff --git a/src/applications/tests/cascade_as_subgroup_classes/test_put_by_time.cpp b/src/applications/tests/cascade_as_subgroup_classes/test_put_by_time.cpp new file mode 100644 index 00000000..d5bedd86 --- /dev/null +++ b/src/applications/tests/cascade_as_subgroup_classes/test_put_by_time.cpp @@ -0,0 +1,210 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace derecho::cascade; + +/** + * Test client for put_by_time API + * + * This program tests the new put_by_time functionality using explicit subgroup/shard indices. + * Tests: + * 1. Valid timestamp test - should succeed + * 2. Invalid (too old) timestamp test - should be rejected + * 3. Comparison with regular put - verify timestamps are different + */ +int main(int argc, char** argv) { + try { + // Initialize the Cascade client (singleton) + auto& capi = ServiceClientAPI::get_service_client(); + + std::cout << "=== Testing put_by_time API ===" << std::endl; + std::cout << std::endl; + + // Use subgroup index 0, shard index 0 for testing + uint32_t subgroup_index = 0; + uint32_t shard_index = 0; + + // Test 1: put_by_time with valid timestamp (current time) + std::cout << "Test 1: put_by_time with current timestamp" << std::endl; + { + ObjectWithStringKey obj; + obj.key = "test_key_1"; + obj.blob = Blob(reinterpret_cast("test_value_1"), 12); + obj.previous_version = persistent::INVALID_VERSION; + obj.previous_version_by_key = persistent::INVALID_VERSION; + + // Get current time in microseconds + uint64_t current_time_us = get_walltime() / 1000ULL; + std::cout << " Current time: " << current_time_us << " microseconds" << std::endl; + std::cout << " Calling put_by_time..." << std::endl; + + auto result = capi.template put_by_time(obj, current_time_us, subgroup_index, shard_index, false); + + // Wait for result + for (auto& reply_future : result.get()) { + auto reply = reply_future.second.get(); + std::cout << " ✓ Success! Version: " << std::get<0>(reply) + << ", Timestamp: " << std::get<1>(reply) << " microseconds" << std::endl; + } + } + std::cout << std::endl; + + // Test 2: put_by_time with future timestamp (should work) + std::cout << "Test 2: put_by_time with future timestamp (within delta)" << std::endl; + { + ObjectWithStringKey obj; + obj.key = "test_key_2"; + obj.blob = Blob(reinterpret_cast("test_value_2"), 12); + obj.previous_version = persistent::INVALID_VERSION; + obj.previous_version_by_key = persistent::INVALID_VERSION; + + // Use a timestamp 500ms in the future (within 1 second delta) + uint64_t current_time_us = get_walltime() / 1000ULL; + uint64_t future_time_us = current_time_us + 500000ULL; // 500ms = 500000 microseconds + std::cout << " Current time: " << current_time_us << " microseconds" << std::endl; + std::cout << " Future time: " << future_time_us << " microseconds" << std::endl; + std::cout << " Calling put_by_time..." << std::endl; + + auto result = capi.template put_by_time(obj, future_time_us, subgroup_index, shard_index, false); + + // Wait for result + for (auto& reply_future : result.get()) { + auto reply = reply_future.second.get(); + std::cout << " ✓ Success! Version: " << std::get<0>(reply) + << ", Timestamp: " << std::get<1>(reply) << " microseconds" << std::endl; + } + } + std::cout << std::endl; + + // Test 3: put_by_time with too old timestamp (should be rejected) + std::cout << "Test 3: put_by_time with too old timestamp (should be rejected)" << std::endl; + { + ObjectWithStringKey obj; + obj.key = "test_key_3"; + obj.blob = Blob(reinterpret_cast("test_value_3"), 12); + obj.previous_version = persistent::INVALID_VERSION; + obj.previous_version_by_key = persistent::INVALID_VERSION; + + // Use a timestamp 2 seconds in the past (older than 1 second delta) + uint64_t current_time_us = get_walltime() / 1000ULL; + uint64_t old_time_us = current_time_us - 2000000ULL; // 2 seconds = 2000000 microseconds + std::cout << " Current time: " << current_time_us << " microseconds" << std::endl; + std::cout << " Old time: " << old_time_us << " microseconds" << std::endl; + std::cout << " Calling put_by_time (should be rejected)..." << std::endl; + + try { + auto result = capi.template put_by_time(obj, old_time_us, subgroup_index, shard_index, false); + // If we get here, the call didn't throw - check if result is empty + bool has_results = false; + for (auto& reply_future : result.get()) { + has_results = true; + auto reply = reply_future.second.get(); + std::cout << " ✗ Unexpected success! Version: " << std::get<0>(reply) + << ", Timestamp: " << std::get<1>(reply) << " microseconds" << std::endl; + } + if (!has_results) { + std::cout << " ✓ Correctly rejected (empty result)" << std::endl; + } + } catch (const derecho::derecho_exception& e) { + std::cout << " ✓ Correctly rejected with exception: " << e.what() << std::endl; + } + } + std::cout << std::endl; + + // Test 4: Compare put_by_time vs regular put + std::cout << "Test 4: Compare put_by_time vs regular put" << std::endl; + { + // First, do a regular put + ObjectWithStringKey obj1; + obj1.key = "test_key_4a"; + obj1.blob = Blob(reinterpret_cast("test_value_4a"), 13); + obj1.previous_version = persistent::INVALID_VERSION; + obj1.previous_version_by_key = persistent::INVALID_VERSION; + + std::cout << " Regular put..." << std::endl; + auto result1 = capi.template put(obj1, subgroup_index, shard_index, false); + uint64_t regular_put_timestamp = 0; + for (auto& reply_future : result1.get()) { + auto reply = reply_future.second.get(); + regular_put_timestamp = std::get<1>(reply); + std::cout << " Timestamp: " << regular_put_timestamp << " microseconds" << std::endl; + } + + // Small delay to ensure different timestamps + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + // Now do put_by_time with a specific timestamp + ObjectWithStringKey obj2; + obj2.key = "test_key_4b"; + obj2.blob = Blob(reinterpret_cast("test_value_4b"), 13); + obj2.previous_version = persistent::INVALID_VERSION; + obj2.previous_version_by_key = persistent::INVALID_VERSION; + + uint64_t custom_timestamp_us = get_walltime() / 1000ULL; + std::cout << " put_by_time with custom timestamp: " << custom_timestamp_us << " microseconds" << std::endl; + auto result2 = capi.template put_by_time(obj2, custom_timestamp_us, subgroup_index, shard_index, false); + uint64_t custom_put_timestamp = 0; + for (auto& reply_future : result2.get()) { + auto reply = reply_future.second.get(); + custom_put_timestamp = std::get<1>(reply); + std::cout << " Timestamp: " << custom_put_timestamp << " microseconds" << std::endl; + } + + // Verify the custom timestamp was used + if (custom_put_timestamp == custom_timestamp_us) { + std::cout << " ✓ Custom timestamp was correctly used!" << std::endl; + } else { + std::cout << " ✗ Warning: Custom timestamp mismatch. Expected: " << custom_timestamp_us + << ", Got: " << custom_put_timestamp << std::endl; + } + } + std::cout << std::endl; + + // Test 5: put_by_time with specific timestamp value + std::cout << "Test 5: put_by_time with specific timestamp value" << std::endl; + { + ObjectWithStringKey obj; + obj.key = "test_key_5"; + obj.blob = Blob(reinterpret_cast("test_value_5"), 12); + obj.previous_version = persistent::INVALID_VERSION; + obj.previous_version_by_key = persistent::INVALID_VERSION; + + // Use a specific timestamp (e.g., 1000000000 microseconds = 1000 seconds) + uint64_t specific_timestamp_us = 1000000000ULL; + std::cout << " Using specific timestamp: " << specific_timestamp_us << " microseconds" << std::endl; + std::cout << " Note: This will be rejected if it's too old" << std::endl; + + try { + auto result = capi.template put_by_time(obj, specific_timestamp_us, subgroup_index, shard_index, false); + bool has_results = false; + for (auto& reply_future : result.get()) { + has_results = true; + auto reply = reply_future.second.get(); + std::cout << " Result - Version: " << std::get<0>(reply) + << ", Timestamp: " << std::get<1>(reply) << " microseconds" << std::endl; + } + if (!has_results) { + std::cout << " Timestamp was rejected (empty result)" << std::endl; + } + } catch (const derecho::derecho_exception& e) { + std::cout << " Timestamp was rejected with exception: " << e.what() << std::endl; + } + } + std::cout << std::endl; + + std::cout << "=== All tests completed ===" << std::endl; + return 0; + + } catch (const std::exception& e) { + std::cerr << "Exception: " << e.what() << std::endl; + return -1; + } +} + From 30600411d05bcb05bf6fdb99c42faea883de32ca Mon Sep 17 00:00:00 2001 From: aliciayuting Date: Wed, 14 Jan 2026 16:57:03 -0500 Subject: [PATCH 3/5] added logic for RPC function of put_by_time for external client --- .../cascade/detail/persistent_store_impl.hpp | 18 ++++++++++++++++++ include/cascade/detail/service_client_impl.hpp | 10 +++++----- include/cascade/detail/trigger_store_impl.hpp | 7 +++++++ include/cascade/detail/volatile_store_impl.hpp | 18 ++++++++++++++++++ include/cascade/persistent_store.hpp | 2 ++ include/cascade/trigger_store.hpp | 3 +++ include/cascade/volatile_store.hpp | 2 ++ .../consistency_test_cfg/n0/derecho.cfg | 2 +- .../consistency_test_cfg/n1/derecho.cfg | 2 +- .../consistency_test_cfg/n2/derecho.cfg | 2 +- .../consistency_test_cfg/n3/derecho.cfg | 2 +- .../test_put_by_time.cpp | 7 +++++-- 12 files changed, 64 insertions(+), 11 deletions(-) diff --git a/include/cascade/detail/persistent_store_impl.hpp b/include/cascade/detail/persistent_store_impl.hpp index 10355312..20546cca 100644 --- a/include/cascade/detail/persistent_store_impl.hpp +++ b/include/cascade/detail/persistent_store_impl.hpp @@ -42,6 +42,24 @@ version_tuple PersistentCascadeStore::put(const VT& value, b return ret; } +template +version_tuple PersistentCascadeStore::put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const { + debug_enter_func_with_args("value.get_key_ref()={}, timestamp_us={}", value.get_key_ref(), timestamp_us); + LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_PUT_START, group, value); + + derecho::Replicated& subgroup_handle = group->template get_subgroup(this->subgroup_index); + auto results = subgroup_handle.template ordered_send_with_timestamp(timestamp_us, value, as_trigger); + auto& replies = results.get(); + version_tuple ret{CURRENT_VERSION, 0}; + for(auto& reply_pair : replies) { + ret = reply_pair.second.get(); + } + + LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_PUT_END, group, value); + debug_leave_func_with_value("version=0x{:x},timestamp={}us", std::get<0>(ret), std::get<1>(ret)); + return ret; +} + template void PersistentCascadeStore::put_and_forget(const VT& value, bool as_trigger) const { debug_enter_func_with_args("value.get_key_ref()={}", value.get_key_ref()); diff --git a/include/cascade/detail/service_client_impl.hpp b/include/cascade/detail/service_client_impl.hpp index d2357a3d..fda5d10e 100644 --- a/include/cascade/detail/service_client_impl.hpp +++ b/include/cascade/detail/service_client_impl.hpp @@ -813,22 +813,22 @@ derecho::rpc::QueryResults ServiceClient::put_by auto& subgroup_handle = group_ptr->template get_subgroup(subgroup_index); return subgroup_handle.template ordered_send_with_timestamp(timestamp_us, value, as_trigger); } else { - // p2p put - timestamp not supported for p2p, fall back to regular put + // p2p put_with_timestamp - send to a shard member who will do ordered_send_with_timestamp node_id_t node_id = pick_member_by_policy(subgroup_index,shard_index,value.get_key_ref()); try { auto& subgroup_handle = group_ptr->template get_subgroup(subgroup_index); - return subgroup_handle.template p2p_send(node_id,value,as_trigger); + return subgroup_handle.template p2p_send(node_id,value,timestamp_us,as_trigger); } catch (derecho::invalid_subgroup_exception& ex) { auto& subgroup_handle = group_ptr->template get_nonmember_subgroup(subgroup_index); - return subgroup_handle.template p2p_send(node_id,value,as_trigger); + return subgroup_handle.template p2p_send(node_id,value,timestamp_us,as_trigger); } } } else { - // External client - timestamp not supported, fall back to regular put + // External client - use put_with_timestamp P2P call std::lock_guard lck(this->external_group_ptr_mutex); auto& caller = external_group_ptr->template get_subgroup_caller(subgroup_index); node_id_t node_id = pick_member_by_policy(subgroup_index,shard_index,value.get_key_ref()); - return caller.template p2p_send(node_id,value,as_trigger); + return caller.template p2p_send(node_id,value,timestamp_us,as_trigger); } } diff --git a/include/cascade/detail/trigger_store_impl.hpp b/include/cascade/detail/trigger_store_impl.hpp index 0f85aaf0..46de1426 100644 --- a/include/cascade/detail/trigger_store_impl.hpp +++ b/include/cascade/detail/trigger_store_impl.hpp @@ -23,6 +23,13 @@ version_tuple TriggerCascadeNoStore::put(const VT& value, bool a return {persistent::INVALID_VERSION, 0}; } +template +version_tuple TriggerCascadeNoStore::put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const { + // TriggerCascadeNoStore doesn't support timestamps, fall back to put_and_forget + put_and_forget(value, as_trigger); + return {persistent::INVALID_VERSION, 0}; +} + template void TriggerCascadeNoStore::put_and_forget(const VT& value, bool as_trigger) const { dbg_default_warn("Calling unsupported func:{}", __PRETTY_FUNCTION__); diff --git a/include/cascade/detail/volatile_store_impl.hpp b/include/cascade/detail/volatile_store_impl.hpp index 69e90c8a..d7516080 100644 --- a/include/cascade/detail/volatile_store_impl.hpp +++ b/include/cascade/detail/volatile_store_impl.hpp @@ -35,6 +35,24 @@ version_tuple VolatileCascadeStore::put(const VT& value, bool as return ret; } +template +version_tuple VolatileCascadeStore::put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const { + debug_enter_func_with_args("value.get_key_ref={}, timestamp_us={}", value.get_key_ref(), timestamp_us); + LOG_TIMESTAMP_BY_TAG(TLT_VOLATILE_PUT_START, group, value); + + derecho::Replicated& subgroup_handle = group->template get_subgroup(this->subgroup_index); + auto results = subgroup_handle.template ordered_send_with_timestamp(timestamp_us, value, as_trigger); + auto& replies = results.get(); + version_tuple ret{CURRENT_VERSION, 0}; + for(auto& reply_pair : replies) { + ret = reply_pair.second.get(); + } + + LOG_TIMESTAMP_BY_TAG(TLT_VOLATILE_PUT_END, group, value); + debug_leave_func_with_value("version=0x{:x},timestamp={}us", std::get<0>(ret), std::get<1>(ret)); + return ret; +} + template void VolatileCascadeStore::put_and_forget(const VT& value, bool as_trigger) const { debug_enter_func_with_args("value.get_key_ref={}", value.get_key_ref()); diff --git a/include/cascade/persistent_store.hpp b/include/cascade/persistent_store.hpp index a32704c6..4e7350ff 100644 --- a/include/cascade/persistent_store.hpp +++ b/include/cascade/persistent_store.hpp @@ -42,6 +42,7 @@ class PersistentCascadeStore : public ICascadeStore, REGISTER_RPC_FUNCTIONS_WITH_NOTIFICATION(PersistentCascadeStore, P2P_TARGETS( put, + put_with_timestamp, put_and_forget, #ifdef ENABLE_EVALUATION perf_put, @@ -86,6 +87,7 @@ class PersistentCascadeStore : public ICascadeStore, #endif // ENABLE_EVALUATION virtual void trigger_put(const VT& value) const override; virtual version_tuple put(const VT& value, bool as_trigger) const override; + virtual version_tuple put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const; virtual void put_and_forget(const VT& value, bool as_trigger) const override; #ifdef ENABLE_EVALUATION virtual double perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const override; diff --git a/include/cascade/trigger_store.hpp b/include/cascade/trigger_store.hpp index 95fdb921..648d6a00 100644 --- a/include/cascade/trigger_store.hpp +++ b/include/cascade/trigger_store.hpp @@ -37,6 +37,7 @@ class TriggerCascadeNoStore : public ICascadeStore, REGISTER_RPC_FUNCTIONS_WITH_NOTIFICATION(TriggerCascadeNoStore, P2P_TARGETS( put, + put_with_timestamp, put_and_forget, #ifdef ENABLE_EVALUATION perf_put, @@ -81,6 +82,8 @@ class TriggerCascadeNoStore : public ICascadeStore, #endif // ENABLE_EVALUATION virtual void trigger_put(const VT& value) const override; virtual version_tuple put(const VT& value, bool as_trigger) const override; + // Fall back to put_and_forget since TriggerCascadeNoStore doesn't support timestamps + virtual version_tuple put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const; virtual void put_and_forget(const VT& value, bool as_trigger) const override; #ifdef ENABLE_EVALUATION virtual double perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const override; diff --git a/include/cascade/volatile_store.hpp b/include/cascade/volatile_store.hpp index 6f2d166f..ffcdd5e1 100644 --- a/include/cascade/volatile_store.hpp +++ b/include/cascade/volatile_store.hpp @@ -47,6 +47,7 @@ class VolatileCascadeStore : public ICascadeStore, REGISTER_RPC_FUNCTIONS_WITH_NOTIFICATION(VolatileCascadeStore, P2P_TARGETS( put, + put_with_timestamp, put_and_forget, #ifdef ENABLE_EVALUATION perf_put, @@ -91,6 +92,7 @@ class VolatileCascadeStore : public ICascadeStore, #endif // ENABLE_EVALUATION virtual void trigger_put(const VT& value) const override; virtual version_tuple put(const VT& value, bool as_trigger) const override; + virtual version_tuple put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const; #ifdef ENABLE_EVALUATION virtual double perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const override; #endif // ENABLE_EVALUATION diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n0/derecho.cfg b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n0/derecho.cfg index b4cc2ab4..81cf7667 100644 --- a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n0/derecho.cfg +++ b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n0/derecho.cfg @@ -158,7 +158,7 @@ max_log_entry = 1048576 # Max data size in bytes for each persistent, default to 512GB max_data_size = 549755813888 # Server clock skew delta in microseconds for put_by_time timestamp validation -server_clock_skew_delta_us = 1000000 +server_clock_skew_delta_us = 1000 # Temporal consistency delta temporal_consistency_delta_us = 1000000 diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n1/derecho.cfg b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n1/derecho.cfg index a83c9e05..1dfc4eab 100644 --- a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n1/derecho.cfg +++ b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n1/derecho.cfg @@ -158,7 +158,7 @@ max_log_entry = 1048576 # Max data size in bytes for each persistent, default to 512GB max_data_size = 549755813888 # Server clock skew delta in microseconds for put_by_time timestamp validation -server_clock_skew_delta_us = 1000000 +server_clock_skew_delta_us = 1000 # Temporal consistency delta temporal_consistency_delta_us = 1000000 diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n2/derecho.cfg b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n2/derecho.cfg index 5609422d..e0b11aaf 100644 --- a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n2/derecho.cfg +++ b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n2/derecho.cfg @@ -158,7 +158,7 @@ max_log_entry = 1048576 # Max data size in bytes for each persistent, default to 512GB max_data_size = 549755813888 # Server clock skew delta in microseconds for put_by_time timestamp validation -server_clock_skew_delta_us = 1000000 +server_clock_skew_delta_us = 1000 # Temporal consistency delta temporal_consistency_delta_us = 1000000 diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n3/derecho.cfg b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n3/derecho.cfg index 73430c4f..9a9e18e5 100644 --- a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n3/derecho.cfg +++ b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n3/derecho.cfg @@ -158,7 +158,7 @@ max_log_entry = 1048576 # Max data size in bytes for each persistent, default to 512GB max_data_size = 549755813888 # Server clock skew delta in microseconds for put_by_time timestamp validation -server_clock_skew_delta_us = 1000000 +server_clock_skew_delta_us = 1000 # Temporal consistency delta temporal_consistency_delta_us = 1000000 diff --git a/src/applications/tests/cascade_as_subgroup_classes/test_put_by_time.cpp b/src/applications/tests/cascade_as_subgroup_classes/test_put_by_time.cpp index d5bedd86..24895f26 100644 --- a/src/applications/tests/cascade_as_subgroup_classes/test_put_by_time.cpp +++ b/src/applications/tests/cascade_as_subgroup_classes/test_put_by_time.cpp @@ -147,8 +147,11 @@ int main(int argc, char** argv) { obj2.previous_version = persistent::INVALID_VERSION; obj2.previous_version_by_key = persistent::INVALID_VERSION; - uint64_t custom_timestamp_us = get_walltime() / 1000ULL; + // Use a timestamp that's guaranteed to be higher than the current HLC + // (regular_put_timestamp + 100ms to be safe) + uint64_t custom_timestamp_us = regular_put_timestamp + 100000ULL; // 100ms ahead of regular put std::cout << " put_by_time with custom timestamp: " << custom_timestamp_us << " microseconds" << std::endl; + std::cout << " (100ms ahead of regular put to ensure HLC advances)" << std::endl; auto result2 = capi.template put_by_time(obj2, custom_timestamp_us, subgroup_index, shard_index, false); uint64_t custom_put_timestamp = 0; for (auto& reply_future : result2.get()) { @@ -161,7 +164,7 @@ int main(int argc, char** argv) { if (custom_put_timestamp == custom_timestamp_us) { std::cout << " ✓ Custom timestamp was correctly used!" << std::endl; } else { - std::cout << " ✗ Warning: Custom timestamp mismatch. Expected: " << custom_timestamp_us + std::cout << " ✗ Error: Custom timestamp mismatch. Expected: " << custom_timestamp_us << ", Got: " << custom_put_timestamp << std::endl; } } From 7f7bae3241b32fb058ebfb1789b512c90ff16453 Mon Sep 17 00:00:00 2001 From: aliciayuting Date: Sun, 18 Jan 2026 20:03:13 -0500 Subject: [PATCH 4/5] fixes for the temporal consistency test cases, added p2p validation code(currently in comments, need discussion of the exact value to check) --- .../cascade/detail/persistent_store_impl.hpp | 12 ++ include/cascade/detail/trigger_store_impl.hpp | 3 +- .../cascade/detail/volatile_store_impl.hpp | 12 ++ include/cascade/trigger_store.hpp | 1 - src/applications/tests/CMakeLists.txt | 1 + .../CMakeLists.txt | 55 +---- .../consistency_test_cfg/n0/derecho.cfg | 197 ----------------- .../consistency_test_cfg/n1/derecho.cfg | 199 ------------------ .../consistency_test_cfg/n2/derecho.cfg | 199 ------------------ .../consistency_test_cfg/n3/derecho.cfg | 199 ------------------ .../consistency_test_cfg/udl_dlls.cfg.tmp | 1 - .../temporal_consistency_test/CMakeLists.txt | 59 ++++++ .../consistency_test_cfg/derecho.cfg | 98 +++++++++ .../consistency_test_cfg/dfgs.json.tmp | 0 .../consistency_test_cfg/layout.json.tmp | 0 .../consistency_test_cfg/n0/derecho_node.cfg | 7 + .../consistency_test_cfg/n1/derecho_node.cfg | 7 + .../consistency_test_cfg/n2/derecho_node.cfg | 7 + .../consistency_test_cfg/n3/derecho_node.cfg | 7 + .../consistency_test_cfg/udl_dlls.cfg.tmp | 0 .../test_get_by_time.cpp | 2 +- .../test_put_by_time.cpp | 27 ++- 22 files changed, 225 insertions(+), 868 deletions(-) delete mode 100644 src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n0/derecho.cfg delete mode 100644 src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n1/derecho.cfg delete mode 100644 src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n2/derecho.cfg delete mode 100644 src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n3/derecho.cfg delete mode 100644 src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/udl_dlls.cfg.tmp create mode 100644 src/applications/tests/temporal_consistency_test/CMakeLists.txt create mode 100644 src/applications/tests/temporal_consistency_test/consistency_test_cfg/derecho.cfg rename src/applications/tests/{cascade_as_subgroup_classes => temporal_consistency_test}/consistency_test_cfg/dfgs.json.tmp (100%) rename src/applications/tests/{cascade_as_subgroup_classes => temporal_consistency_test}/consistency_test_cfg/layout.json.tmp (100%) create mode 100644 src/applications/tests/temporal_consistency_test/consistency_test_cfg/n0/derecho_node.cfg create mode 100644 src/applications/tests/temporal_consistency_test/consistency_test_cfg/n1/derecho_node.cfg create mode 100644 src/applications/tests/temporal_consistency_test/consistency_test_cfg/n2/derecho_node.cfg create mode 100644 src/applications/tests/temporal_consistency_test/consistency_test_cfg/n3/derecho_node.cfg create mode 100644 src/applications/tests/temporal_consistency_test/consistency_test_cfg/udl_dlls.cfg.tmp rename src/applications/tests/{cascade_as_subgroup_classes => temporal_consistency_test}/test_get_by_time.cpp (99%) rename src/applications/tests/{cascade_as_subgroup_classes => temporal_consistency_test}/test_put_by_time.cpp (93%) diff --git a/include/cascade/detail/persistent_store_impl.hpp b/include/cascade/detail/persistent_store_impl.hpp index 20546cca..57990daf 100644 --- a/include/cascade/detail/persistent_store_impl.hpp +++ b/include/cascade/detail/persistent_store_impl.hpp @@ -45,6 +45,18 @@ version_tuple PersistentCascadeStore::put(const VT& value, b template version_tuple PersistentCascadeStore::put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const { debug_enter_func_with_args("value.get_key_ref()={}, timestamp_us={}", value.get_key_ref(), timestamp_us); + + // TODO: decide if we want to enable server side timestamp validation and what values to set + // uint64_t now_us = get_walltime() / 1000ULL; // Convert nanoseconds to microseconds + // uint64_t temporal_consistency_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_TEMPORAL_CONSISTENCY_DELTA_US); + // uint64_t server_clock_skew_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US); + // uint64_t threshold_us = now_us - temporal_consistency_delta_us - server_clock_skew_delta_us; + // if (timestamp_us < threshold_us) { + // dbg_default_warn("put_with_timestamp: rejecting timestamp {} us (threshold: {} us, now: {} us)", + // timestamp_us, threshold_us, now_us); + // throw derecho::derecho_exception("put_with_timestamp: timestamp is too old (older than now - temporal_consistency_delta - server_clock_skew_delta)"); + // } + LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_PUT_START, group, value); derecho::Replicated& subgroup_handle = group->template get_subgroup(this->subgroup_index); diff --git a/include/cascade/detail/trigger_store_impl.hpp b/include/cascade/detail/trigger_store_impl.hpp index 46de1426..0dc5d59a 100644 --- a/include/cascade/detail/trigger_store_impl.hpp +++ b/include/cascade/detail/trigger_store_impl.hpp @@ -25,8 +25,7 @@ version_tuple TriggerCascadeNoStore::put(const VT& value, bool a template version_tuple TriggerCascadeNoStore::put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const { - // TriggerCascadeNoStore doesn't support timestamps, fall back to put_and_forget - put_and_forget(value, as_trigger); + dbg_default_warn("Calling unsupported func:{}", __PRETTY_FUNCTION__); return {persistent::INVALID_VERSION, 0}; } diff --git a/include/cascade/detail/volatile_store_impl.hpp b/include/cascade/detail/volatile_store_impl.hpp index d7516080..4dfbb024 100644 --- a/include/cascade/detail/volatile_store_impl.hpp +++ b/include/cascade/detail/volatile_store_impl.hpp @@ -38,6 +38,18 @@ version_tuple VolatileCascadeStore::put(const VT& value, bool as template version_tuple VolatileCascadeStore::put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const { debug_enter_func_with_args("value.get_key_ref={}, timestamp_us={}", value.get_key_ref(), timestamp_us); + + // TODO: decide if we want to enable server side timestamp validation and what values to set + // uint64_t now_us = get_walltime() / 1000ULL; // Convert nanoseconds to microseconds + // uint64_t temporal_consistency_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_TEMPORAL_CONSISTENCY_DELTA_US); + // uint64_t server_clock_skew_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US); + // uint64_t threshold_us = now_us - temporal_consistency_delta_us - server_clock_skew_delta_us; + // if (timestamp_us < threshold_us) { + // dbg_default_warn("put_with_timestamp: rejecting timestamp {} us (threshold: {} us, now: {} us)", + // timestamp_us, threshold_us, now_us); + // throw derecho::derecho_exception("put_with_timestamp: timestamp is too old (older than now - temporal_consistency_delta - server_clock_skew_delta)"); + // } + LOG_TIMESTAMP_BY_TAG(TLT_VOLATILE_PUT_START, group, value); derecho::Replicated& subgroup_handle = group->template get_subgroup(this->subgroup_index); diff --git a/include/cascade/trigger_store.hpp b/include/cascade/trigger_store.hpp index 648d6a00..d8cab056 100644 --- a/include/cascade/trigger_store.hpp +++ b/include/cascade/trigger_store.hpp @@ -82,7 +82,6 @@ class TriggerCascadeNoStore : public ICascadeStore, #endif // ENABLE_EVALUATION virtual void trigger_put(const VT& value) const override; virtual version_tuple put(const VT& value, bool as_trigger) const override; - // Fall back to put_and_forget since TriggerCascadeNoStore doesn't support timestamps virtual version_tuple put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const; virtual void put_and_forget(const VT& value, bool as_trigger) const override; #ifdef ENABLE_EVALUATION diff --git a/src/applications/tests/CMakeLists.txt b/src/applications/tests/CMakeLists.txt index 9d04a2ef..b1e5ec45 100644 --- a/src/applications/tests/CMakeLists.txt +++ b/src/applications/tests/CMakeLists.txt @@ -2,3 +2,4 @@ add_subdirectory(unit_tests) add_subdirectory(cascade_as_subgroup_classes) add_subdirectory(user_defined_logic) add_subdirectory(pipeline) +add_subdirectory(temporal_consistency_test) \ No newline at end of file diff --git a/src/applications/tests/cascade_as_subgroup_classes/CMakeLists.txt b/src/applications/tests/cascade_as_subgroup_classes/CMakeLists.txt index 358cb547..56decc9c 100644 --- a/src/applications/tests/cascade_as_subgroup_classes/CMakeLists.txt +++ b/src/applications/tests/cascade_as_subgroup_classes/CMakeLists.txt @@ -22,57 +22,4 @@ add_custom_command(TARGET cli_example POST_BUILD ${CMAKE_CURRENT_BINARY_DIR}/cli_example_cfg COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/perf_shutdown.py ${CMAKE_CURRENT_BINARY_DIR}/perf_shutdown.py -) - -# Add test_get_by_time executable -add_executable(test_get_by_time test_get_by_time.cpp) -target_include_directories(test_get_by_time PRIVATE - $ - $ - $ - $ -) -target_link_libraries(test_get_by_time cascade pthread) - -# Test for put_by_time API -add_executable(test_put_by_time test_put_by_time.cpp) -target_include_directories(test_put_by_time PRIVATE - $ - $ - $ -) -target_link_libraries(test_put_by_time cascade) - -add_custom_command(TARGET test_put_by_time POST_BUILD - COMMAND ${CMAKE_COMMAND} -E copy_directory ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg - ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg - COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp - ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n0/layout.json - COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp - ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n1/layout.json - COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp - ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n2/layout.json - COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp - ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n3/layout.json - COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp - ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n0/dfgs.json - COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp - ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n1/dfgs.json - COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp - ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n2/dfgs.json - COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp - ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n3/dfgs.json - COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp - ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n0/udl_dlls.cfg - COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp - ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n1/udl_dlls.cfg - COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp - ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n2/udl_dlls.cfg - COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp - ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n3/udl_dlls.cfg - DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg/run.sh.tmp - ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg/layout.json.tmp - ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg/dfgs.json.tmp - ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp - COMMENT "prepare test_put_by_time configuration" -) +) \ No newline at end of file diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n0/derecho.cfg b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n0/derecho.cfg deleted file mode 100644 index 81cf7667..00000000 --- a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n0/derecho.cfg +++ /dev/null @@ -1,197 +0,0 @@ -[DERECHO] -# leader ip - the leader's ip address -contact_ip = 127.0.0.1 -# leader gms port - the leader's gms port -contact_port = 23580 -# my local id - each node should have a different id -local_id = 0 -# my local ip address -local_ip = 127.0.0.1 -# derecho gms port -gms_port = 23580 -# derecho rpc port -state_transfer_port = 28366 -# sst tcp port -sst_port = 37683 -# rdmc tcp port -rdmc_port = 31675 -# external port -external_port = 32645 -# this is the frequency of the failure detector thread. -# It is best to leave this to 1 ms for RDMA. If it is too high, -# you run the risk of overflowing the queue of outstanding sends. -heartbeat_ms = 100 -# sst poll completion queue timeout in millisecond -sst_poll_cq_timeout_ms = 100 -# disable partitioning safety -# By disabling this feature, the derecho is allowed to run when active -# members cannot form a majority. Please be aware of the 'split-brain' -# syndrome:https://en.wikipedia.org/wiki/Split-brain and make sure your -# application is fine with it. -# To help the user play with derecho at beginning, we disabled the -# partitioning safety. We suggest to set it to false for serious deployment -disable_partitioning_safety = false - -# maximum payload size for P2P requests -max_p2p_request_payload_size = 1048576 -# maximum payload size for P2P replies -max_p2p_reply_payload_size = 10240 -# window size for P2P requests and replies -p2p_window_size = 4 - -# Subgroup configurations -# - The default subgroup settings -[SUBGROUP/DEFAULT] -# maximum payload size -# Any message with size large than this has to be broken -# down to multiple messages. -# Large message consumes memory space because the memory buffers -# have to be pre-allocated. -max_payload_size = 1048576 -# maximum reply payload size -# This is for replies generated by ordered sends in the subgroup -max_reply_payload_size = 10240 -# maximum smc (SST's small message multicast) payload size -# If the message size is smaller or equal to this size, -# it will be sent using SST multicast, otherwise it will -# try RDMC if the message size is smaller than max_payload_size. -max_smc_payload_size = 10240 -# block size depends on your max_payload_size. -# It is only relevant if you are ever going to send a message using RDMC. -# In that case, it should be set to the same value as the max_payload_size, -# if the max_payload_size is around 1 MB. For very large messages, the block # size should be a few MBs (1 is fine). -block_size = 1048576 -# message window size -# the length of the message pipeline -window_size = 16 -# the send algorithm for RDMC. Other options are -# chain_send, sequential_send, tree_send -rdmc_send_algorithm = binomial_send -# - SAMPLE for large message settings -[SUBGROUP/LARGE] -max_payload_size = 102400 -max_reply_payload_size = 102400 -max_smc_payload_size = 10240 -block_size = 10240 -window_size = 3 -rdmc_send_algorithm = binomial_send -# - SAMPLE for small message settings -[SUBGROUP/SMALL] -max_payload_size = 100 -max_reply_payload_size = 100 -max_smc_payload_size = 100 -# does not matter unless max_payload_size > max_smc_payload_size -block_size = 0 -window_size = 50 -rdmc_send_algorithm = binomial_send - -[SUBGROUP/VCS] -max_payload_size = 1048576 -max_reply_payload_size = 8192 -max_smc_payload_size = 10240 -# does not matter unless max_payload_size > max_smc_payload_size -block_size = 1048576 -window_size = 8 -rdmc_send_algorithm = binomial_send -num_shards = 1 -min_nodes = 1 -max_nodes = 4 - -[SUBGROUP/PCS] -max_payload_size = 8192 -max_reply_payload_size = 8192 -max_smc_payload_size = 10240 -# does not matter unless max_payload_size > max_smc_payload_size -block_size = 1048576 -window_size = 50 -rdmc_send_algorithm = binomial_send -num_shards = 1 -min_nodes = 1 -max_nodes = 4 - -# RDMA section contains configurations of the following -# - which RDMA device to use -# - device configurations -[RDMA] -# 1. provider = bgq|gni|mlx|netdir|psm|psm2|rxd|rxm|shm|tcp|udp|usnic|verbs -# possible options(only 'tcp' and 'verbs' providers are tested so far): -# bgq - The Blue Gene/Q Fabric Provider -# gni - The GNI Fabric Provider (Cray XC (TM) systems) -# mlx - The MLX Fabric Provider (UCX library) -# netdir - The Network Direct Fabric Provider (Microsoft Network Direct SPI) -# psm - The PSM Fabric Provider -# psm2 - The PSM2 Fabric Provider -# rxd - The RxD (RDM over DGRAM) Utility Provider -# rxm - The RxM (RDM over MSG) Utility Provider -# shm - The SHM Fabric Provider -# tcp - The Sockets Fabric Provider (TCP) -# udp - The UDP Fabric Provider -# usnic - The usNIC Fabric Provider (Cisco VIC) -# verbs - The Verbs Fabric Provider -provider = tcp - -# 2. domain -# For tcp provider, domain is the NIC name (ifconfig | grep -v -e "^ ") -# For verbs provider, domain is the device name (ibv_devices) -domain = lo - -# 3. tx_depth -# tx_depth applies to hints->tx_attr->size, where hint is a struct fi_info object. -# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html -tx_depth = 256 - -# 4. rx_depth: -# rx_depth applies to hints->rx_attr->size, where hint is a struct fi_info object. -# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html -rx_depth = 256 - -# Persistent configurations -[PERS] -# persistent directory for file system-based logfile. -file_path = .plog -ramdisk_path = /dev/shm/volatile_t -# Reset persistent data -# CAUTION: "reset = true" removes existing persisted data!!! -reset = false -# Max number of the log entries in each persistent, default to 1048576 -max_log_entry = 1048576 -# Max data size in bytes for each persistent, default to 512GB -max_data_size = 549755813888 -# Server clock skew delta in microseconds for put_by_time timestamp validation -server_clock_skew_delta_us = 1000 -# Temporal consistency delta -temporal_consistency_delta_us = 1000000 - - -# Logger configurations -[LOGGER] -# default log name -default_log_name = derecho_debug -# default log level -# Available options: -# trace,debug,info,warn,error,critical,off -default_log_level = off - -[LAYOUT] -json_layout_file = 'layout.json' - -# cascade service configuration -# TODO: add document for how to setup a cascade service. -[CASCADE] -cpu_cores = 0-3 -num_stateless_workers_for_multicast_ocdp = 2 -num_stateless_workers_for_p2p_ocdp = 2 -num_stateful_workers_for_multicast_ocdp = 2 -num_stateful_workers_for_p2p_ocdp = 2 -worker_cpu_affinity = ' -{ - "multicast_ocdp" : { - "0":"0", - "1":"1" - }, - "p2p_ocdp" : { - "0":"2", - "1":"3" - } -} -' diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n1/derecho.cfg b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n1/derecho.cfg deleted file mode 100644 index 1dfc4eab..00000000 --- a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n1/derecho.cfg +++ /dev/null @@ -1,199 +0,0 @@ -[DERECHO] -# leader ip - the leader's ip address -contact_ip = 127.0.0.1 -# leader gms port - the leader's gms port -contact_port = 23580 -# my local id - each node should have a different id -local_id = 1 -# my local ip address -local_ip = 127.0.0.1 -# derecho gms port -gms_port = 23581 -# derecho rpc port -state_transfer_port = 28367 -# sst tcp port -sst_port = 37684 -# rdmc tcp port -rdmc_port = 31674 -# external port -external_port = 32646 -# this is the frequency of the failure detector thread. -# It is best to leave this to 1 ms for RDMA. If it is too high, -# you run the risk of overflowing the queue of outstanding sends. -heartbeat_ms = 100 -# sst poll completion queue timeout in millisecond -sst_poll_cq_timeout_ms = 100 -# disable partitioning safety -# By disabling this feature, the derecho is allowed to run when active -# members cannot form a majority. Please be aware of the 'split-brain' -# syndrome:https://en.wikipedia.org/wiki/Split-brain and make sure your -# application is fine with it. -# To help the user play with derecho at beginning, we disabled the -# partitioning safety. We suggest to set it to false for serious deployment -disable_partitioning_safety = false - -# maximum payload size for P2P requests -max_p2p_request_payload_size = 1048576 -# maximum payload size for P2P replies -max_p2p_reply_payload_size = 10240 -# window size for P2P requests and replies -p2p_window_size = 4 - -# Subgroup configurations -# - The default subgroup settings -[SUBGROUP/DEFAULT] -# maximum payload size -# Any message with size large than this has to be broken -# down to multiple messages. -# Large message consumes memory space because the memory buffers -# have to be pre-allocated. -max_payload_size = 1048576 -# maximum reply payload size -# This is for replies generated by ordered sends in the subgroup -max_reply_payload_size = 10240 -# maximum smc (SST's small message multicast) payload size -# If the message size is smaller or equal to this size, -# it will be sent using SST multicast, otherwise it will -# try RDMC if the message size is smaller than max_payload_size. -max_smc_payload_size = 10240 -# block size depends on your max_payload_size. -# It is only relevant if you are ever going to send a message using RDMC. -# In that case, it should be set to the same value as the max_payload_size, -# if the max_payload_size is around 1 MB. For very large messages, the block # size should be a few MBs (1 is fine). -block_size = 1048576 -# message window size -# the length of the message pipeline -window_size = 16 -# the send algorithm for RDMC. Other options are -# chain_send, sequential_send, tree_send -rdmc_send_algorithm = binomial_send -# - SAMPLE for large message settings -[SUBGROUP/LARGE] -max_payload_size = 102400 -max_reply_payload_size = 102400 -max_smc_payload_size = 10240 -block_size = 10240 -window_size = 3 -rdmc_send_algorithm = binomial_send -# - SAMPLE for small message settings -[SUBGROUP/SMALL] -max_payload_size = 100 -max_reply_payload_size = 100 -max_smc_payload_size = 100 -# does not matter unless max_payload_size > max_smc_payload_size -block_size = 0 -window_size = 50 -rdmc_send_algorithm = binomial_send - -[SUBGROUP/VCS] -max_payload_size = 1048576 -max_reply_payload_size = 8192 -max_smc_payload_size = 10240 -# does not matter unless max_payload_size > max_smc_payload_size -block_size = 1048576 -window_size = 8 -rdmc_send_algorithm = binomial_send -num_shards = 1 -min_nodes = 1 -max_nodes = 4 - -[SUBGROUP/PCS] -max_payload_size = 8192 -max_reply_payload_size = 8192 -max_smc_payload_size = 10240 -# does not matter unless max_payload_size > max_smc_payload_size -block_size = 1048576 -window_size = 50 -rdmc_send_algorithm = binomial_send -num_shards = 1 -min_nodes = 1 -max_nodes = 4 - -# RDMA section contains configurations of the following -# - which RDMA device to use -# - device configurations -[RDMA] -# 1. provider = bgq|gni|mlx|netdir|psm|psm2|rxd|rxm|shm|tcp|udp|usnic|verbs -# possible options(only 'tcp' and 'verbs' providers are tested so far): -# bgq - The Blue Gene/Q Fabric Provider -# gni - The GNI Fabric Provider (Cray XC (TM) systems) -# mlx - The MLX Fabric Provider (UCX library) -# netdir - The Network Direct Fabric Provider (Microsoft Network Direct SPI) -# psm - The PSM Fabric Provider -# psm2 - The PSM2 Fabric Provider -# rxd - The RxD (RDM over DGRAM) Utility Provider -# rxm - The RxM (RDM over MSG) Utility Provider -# shm - The SHM Fabric Provider -# tcp - The Sockets Fabric Provider (TCP) -# udp - The UDP Fabric Provider -# usnic - The usNIC Fabric Provider (Cisco VIC) -# verbs - The Verbs Fabric Provider -provider = tcp - -# 2. domain -# For tcp provider, domain is the NIC name (ifconfig | grep -v -e "^ ") -# For verbs provider, domain is the device name (ibv_devices) -domain = lo - -# 3. tx_depth -# tx_depth applies to hints->tx_attr->size, where hint is a struct fi_info object. -# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html -tx_depth = 256 - -# 4. rx_depth: -# rx_depth applies to hints->rx_attr->size, where hint is a struct fi_info object. -# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html -rx_depth = 256 - -# Persistent configurations -[PERS] -# persistent directory for file system-based logfile. -file_path = .plog -ramdisk_path = /dev/shm/volatile_t -# Reset persistent data -# CAUTION: "reset = true" removes existing persisted data!!! -reset = false -# Max number of the log entries in each persistent, default to 1048576 -max_log_entry = 1048576 -# Max data size in bytes for each persistent, default to 512GB -max_data_size = 549755813888 -# Server clock skew delta in microseconds for put_by_time timestamp validation -server_clock_skew_delta_us = 1000 -# Temporal consistency delta -temporal_consistency_delta_us = 1000000 - - -# Logger configurations -[LOGGER] -# default log name -default_log_name = derecho_debug -# default log level -# Available options: -# trace,debug,info,warn,error,critical,off -default_log_level = off - - -[LAYOUT] -json_layout_file = 'layout.json' - -# cascade service configuration -# TODO: add document for how to setup a cascade service. -[CASCADE] -cpu_cores = 0-3 -num_stateless_workers_for_multicast_ocdp = 2 -num_stateless_workers_for_p2p_ocdp = 2 -num_stateful_workers_for_multicast_ocdp = 2 -num_stateful_workers_for_p2p_ocdp = 2 -worker_cpu_affinity = ' -{ - "multicast_ocdp" : { - "0":"0", - "1":"1" - }, - "p2p_ocdp" : { - "0":"2", - "1":"3" - } -} -' - diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n2/derecho.cfg b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n2/derecho.cfg deleted file mode 100644 index e0b11aaf..00000000 --- a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n2/derecho.cfg +++ /dev/null @@ -1,199 +0,0 @@ -[DERECHO] -# leader ip - the leader's ip address -contact_ip = 127.0.0.1 -# leader gms port - the leader's gms port -contact_port = 23580 -# my local id - each node should have a different id -local_id = 2 -# my local ip address -local_ip = 127.0.0.1 -# derecho gms port -gms_port = 23582 -# derecho rpc port -state_transfer_port = 28368 -# sst tcp port -sst_port = 37685 -# rdmc tcp port -rdmc_port = 31677 -# external port -external_port = 32647 -# this is the frequency of the failure detector thread. -# It is best to leave this to 1 ms for RDMA. If it is too high, -# you run the risk of overflowing the queue of outstanding sends. -heartbeat_ms = 100 -# sst poll completion queue timeout in millisecond -sst_poll_cq_timeout_ms = 100 -# disable partitioning safety -# By disabling this feature, the derecho is allowed to run when active -# members cannot form a majority. Please be aware of the 'split-brain' -# syndrome:https://en.wikipedia.org/wiki/Split-brain and make sure your -# application is fine with it. -# To help the user play with derecho at beginning, we disabled the -# partitioning safety. We suggest to set it to false for serious deployment -disable_partitioning_safety = false - -# maximum payload size for P2P requests -max_p2p_request_payload_size = 1048576 -# maximum payload size for P2P replies -max_p2p_reply_payload_size = 10240 -# window size for P2P requests and replies -p2p_window_size = 4 - -# Subgroup configurations -# - The default subgroup settings -[SUBGROUP/DEFAULT] -# maximum payload size -# Any message with size large than this has to be broken -# down to multiple messages. -# Large message consumes memory space because the memory buffers -# have to be pre-allocated. -max_payload_size = 1048576 -# maximum reply payload size -# This is for replies generated by ordered sends in the subgroup -max_reply_payload_size = 10240 -# maximum smc (SST's small message multicast) payload size -# If the message size is smaller or equal to this size, -# it will be sent using SST multicast, otherwise it will -# try RDMC if the message size is smaller than max_payload_size. -max_smc_payload_size = 10240 -# block size depends on your max_payload_size. -# It is only relevant if you are ever going to send a message using RDMC. -# In that case, it should be set to the same value as the max_payload_size, -# if the max_payload_size is around 1 MB. For very large messages, the block # size should be a few MBs (1 is fine). -block_size = 1048576 -# message window size -# the length of the message pipeline -window_size = 16 -# the send algorithm for RDMC. Other options are -# chain_send, sequential_send, tree_send -rdmc_send_algorithm = binomial_send -# - SAMPLE for large message settings -[SUBGROUP/LARGE] -max_payload_size = 102400 -max_reply_payload_size = 102400 -max_smc_payload_size = 10240 -block_size = 10240 -window_size = 3 -rdmc_send_algorithm = binomial_send -# - SAMPLE for small message settings -[SUBGROUP/SMALL] -max_payload_size = 100 -max_reply_payload_size = 100 -max_smc_payload_size = 100 -# does not matter unless max_payload_size > max_smc_payload_size -block_size = 0 -window_size = 50 -rdmc_send_algorithm = binomial_send - -[SUBGROUP/VCS] -max_payload_size = 1048576 -max_reply_payload_size = 8192 -max_smc_payload_size = 10240 -# does not matter unless max_payload_size > max_smc_payload_size -block_size = 1048576 -window_size = 8 -rdmc_send_algorithm = binomial_send -num_shards = 1 -min_nodes = 1 -max_nodes = 4 - -[SUBGROUP/PCS] -max_payload_size = 8192 -max_reply_payload_size = 8192 -max_smc_payload_size = 10240 -# does not matter unless max_payload_size > max_smc_payload_size -block_size = 1048576 -window_size = 50 -rdmc_send_algorithm = binomial_send -num_shards = 1 -min_nodes = 1 -max_nodes = 4 - -# RDMA section contains configurations of the following -# - which RDMA device to use -# - device configurations -[RDMA] -# 1. provider = bgq|gni|mlx|netdir|psm|psm2|rxd|rxm|shm|tcp|udp|usnic|verbs -# possible options(only 'tcp' and 'verbs' providers are tested so far): -# bgq - The Blue Gene/Q Fabric Provider -# gni - The GNI Fabric Provider (Cray XC (TM) systems) -# mlx - The MLX Fabric Provider (UCX library) -# netdir - The Network Direct Fabric Provider (Microsoft Network Direct SPI) -# psm - The PSM Fabric Provider -# psm2 - The PSM2 Fabric Provider -# rxd - The RxD (RDM over DGRAM) Utility Provider -# rxm - The RxM (RDM over MSG) Utility Provider -# shm - The SHM Fabric Provider -# tcp - The Sockets Fabric Provider (TCP) -# udp - The UDP Fabric Provider -# usnic - The usNIC Fabric Provider (Cisco VIC) -# verbs - The Verbs Fabric Provider -provider = tcp - -# 2. domain -# For tcp provider, domain is the NIC name (ifconfig | grep -v -e "^ ") -# For verbs provider, domain is the device name (ibv_devices) -domain = lo - -# 3. tx_depth -# tx_depth applies to hints->tx_attr->size, where hint is a struct fi_info object. -# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html -tx_depth = 256 - -# 4. rx_depth: -# rx_depth applies to hints->rx_attr->size, where hint is a struct fi_info object. -# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html -rx_depth = 256 - -# Persistent configurations -[PERS] -# persistent directory for file system-based logfile. -file_path = .plog -ramdisk_path = /dev/shm/volatile_t -# Reset persistent data -# CAUTION: "reset = true" removes existing persisted data!!! -reset = false -# Max number of the log entries in each persistent, default to 1048576 -max_log_entry = 1048576 -# Max data size in bytes for each persistent, default to 512GB -max_data_size = 549755813888 -# Server clock skew delta in microseconds for put_by_time timestamp validation -server_clock_skew_delta_us = 1000 -# Temporal consistency delta -temporal_consistency_delta_us = 1000000 - - -# Logger configurations -[LOGGER] -# default log name -default_log_name = derecho_debug -# default log level -# Available options: -# trace,debug,info,warn,error,critical,off -default_log_level = off - - -[LAYOUT] -json_layout_file = 'layout.json' - -# cascade service configuration -# TODO: add document for how to setup a cascade service. -[CASCADE] -cpu_cores = 0-3 -num_stateless_workers_for_multicast_ocdp = 2 -num_stateless_workers_for_p2p_ocdp = 2 -num_stateful_workers_for_multicast_ocdp = 2 -num_stateful_workers_for_p2p_ocdp = 2 -worker_cpu_affinity = ' -{ - "multicast_ocdp" : { - "0":"0", - "1":"1" - }, - "p2p_ocdp" : { - "0":"2", - "1":"3" - } -} -' - diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n3/derecho.cfg b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n3/derecho.cfg deleted file mode 100644 index 9a9e18e5..00000000 --- a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/n3/derecho.cfg +++ /dev/null @@ -1,199 +0,0 @@ -[DERECHO] -# leader ip - the leader's ip address -contact_ip = 127.0.0.1 -# leader gms port - the leader's gms port -contact_port = 23580 -# my local id - each node should have a different id -local_id = 3 -# my local ip address -local_ip = 127.0.0.1 -# derecho gms port -gms_port = 23583 -# derecho rpc port -state_transfer_port = 28369 -# sst tcp port -sst_port = 37686 -# rdmc tcp port -rdmc_port = 31678 -# external port -external_port = 32648 -# this is the frequency of the failure detector thread. -# It is best to leave this to 1 ms for RDMA. If it is too high, -# you run the risk of overflowing the queue of outstanding sends. -heartbeat_ms = 100 -# sst poll completion queue timeout in millisecond -sst_poll_cq_timeout_ms = 100 -# disable partitioning safety -# By disabling this feature, the derecho is allowed to run when active -# members cannot form a majority. Please be aware of the 'split-brain' -# syndrome:https://en.wikipedia.org/wiki/Split-brain and make sure your -# application is fine with it. -# To help the user play with derecho at beginning, we disabled the -# partitioning safety. We suggest to set it to false for serious deployment -disable_partitioning_safety = false - -# maximum payload size for P2P requests -max_p2p_request_payload_size = 1048576 -# maximum payload size for P2P replies -max_p2p_reply_payload_size = 10240 -# window size for P2P requests and replies -p2p_window_size = 4 - -# Subgroup configurations -# - The default subgroup settings -[SUBGROUP/DEFAULT] -# maximum payload size -# Any message with size large than this has to be broken -# down to multiple messages. -# Large message consumes memory space because the memory buffers -# have to be pre-allocated. -max_payload_size = 1048576 -# maximum reply payload size -# This is for replies generated by ordered sends in the subgroup -max_reply_payload_size = 10240 -# maximum smc (SST's small message multicast) payload size -# If the message size is smaller or equal to this size, -# it will be sent using SST multicast, otherwise it will -# try RDMC if the message size is smaller than max_payload_size. -max_smc_payload_size = 10240 -# block size depends on your max_payload_size. -# It is only relevant if you are ever going to send a message using RDMC. -# In that case, it should be set to the same value as the max_payload_size, -# if the max_payload_size is around 1 MB. For very large messages, the block # size should be a few MBs (1 is fine). -block_size = 1048576 -# message window size -# the length of the message pipeline -window_size = 16 -# the send algorithm for RDMC. Other options are -# chain_send, sequential_send, tree_send -rdmc_send_algorithm = binomial_send -# - SAMPLE for large message settings -[SUBGROUP/LARGE] -max_payload_size = 102400 -max_reply_payload_size = 102400 -max_smc_payload_size = 10240 -block_size = 10240 -window_size = 3 -rdmc_send_algorithm = binomial_send -# - SAMPLE for small message settings -[SUBGROUP/SMALL] -max_payload_size = 100 -max_reply_payload_size = 100 -max_smc_payload_size = 100 -# does not matter unless max_payload_size > max_smc_payload_size -block_size = 0 -window_size = 50 -rdmc_send_algorithm = binomial_send - -[SUBGROUP/VCS] -max_payload_size = 1048576 -max_reply_payload_size = 8192 -max_smc_payload_size = 10240 -# does not matter unless max_payload_size > max_smc_payload_size -block_size = 1048576 -window_size = 8 -rdmc_send_algorithm = binomial_send -num_shards = 1 -min_nodes = 1 -max_nodes = 4 - -[SUBGROUP/PCS] -max_payload_size = 8192 -max_reply_payload_size = 8192 -max_smc_payload_size = 10240 -# does not matter unless max_payload_size > max_smc_payload_size -block_size = 1048576 -window_size = 50 -rdmc_send_algorithm = binomial_send -num_shards = 1 -min_nodes = 1 -max_nodes = 4 - -# RDMA section contains configurations of the following -# - which RDMA device to use -# - device configurations -[RDMA] -# 1. provider = bgq|gni|mlx|netdir|psm|psm2|rxd|rxm|shm|tcp|udp|usnic|verbs -# possible options(only 'tcp' and 'verbs' providers are tested so far): -# bgq - The Blue Gene/Q Fabric Provider -# gni - The GNI Fabric Provider (Cray XC (TM) systems) -# mlx - The MLX Fabric Provider (UCX library) -# netdir - The Network Direct Fabric Provider (Microsoft Network Direct SPI) -# psm - The PSM Fabric Provider -# psm2 - The PSM2 Fabric Provider -# rxd - The RxD (RDM over DGRAM) Utility Provider -# rxm - The RxM (RDM over MSG) Utility Provider -# shm - The SHM Fabric Provider -# tcp - The Sockets Fabric Provider (TCP) -# udp - The UDP Fabric Provider -# usnic - The usNIC Fabric Provider (Cisco VIC) -# verbs - The Verbs Fabric Provider -provider = tcp - -# 2. domain -# For tcp provider, domain is the NIC name (ifconfig | grep -v -e "^ ") -# For verbs provider, domain is the device name (ibv_devices) -domain = lo - -# 3. tx_depth -# tx_depth applies to hints->tx_attr->size, where hint is a struct fi_info object. -# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html -tx_depth = 256 - -# 4. rx_depth: -# rx_depth applies to hints->rx_attr->size, where hint is a struct fi_info object. -# see https://ofiwg.github.io/libfabric/master/man/fi_getinfo.3.html -rx_depth = 256 - -# Persistent configurations -[PERS] -# persistent directory for file system-based logfile. -file_path = .plog -ramdisk_path = /dev/shm/volatile_t -# Reset persistent data -# CAUTION: "reset = true" removes existing persisted data!!! -reset = false -# Max number of the log entries in each persistent, default to 1048576 -max_log_entry = 1048576 -# Max data size in bytes for each persistent, default to 512GB -max_data_size = 549755813888 -# Server clock skew delta in microseconds for put_by_time timestamp validation -server_clock_skew_delta_us = 1000 -# Temporal consistency delta -temporal_consistency_delta_us = 1000000 - - -# Logger configurations -[LOGGER] -# default log name -default_log_name = derecho_debug -# default log level -# Available options: -# trace,debug,info,warn,error,critical,off -default_log_level = off - - -[LAYOUT] -json_layout_file = 'layout.json' - -# cascade service configuration -# TODO: add document for how to setup a cascade service. -[CASCADE] -cpu_cores = 0-3 -num_stateless_workers_for_multicast_ocdp = 2 -num_stateless_workers_for_p2p_ocdp = 2 -num_stateful_workers_for_multicast_ocdp = 2 -num_stateful_workers_for_p2p_ocdp = 2 -worker_cpu_affinity = ' -{ - "multicast_ocdp" : { - "0":"0", - "1":"1" - }, - "p2p_ocdp" : { - "0":"2", - "1":"3" - } -} -' - diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/udl_dlls.cfg.tmp b/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/udl_dlls.cfg.tmp deleted file mode 100644 index 03ef26d3..00000000 --- a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/udl_dlls.cfg.tmp +++ /dev/null @@ -1 +0,0 @@ -../../libconsole_printer_udl.so diff --git a/src/applications/tests/temporal_consistency_test/CMakeLists.txt b/src/applications/tests/temporal_consistency_test/CMakeLists.txt new file mode 100644 index 00000000..25496e6d --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/CMakeLists.txt @@ -0,0 +1,59 @@ +# Test for get_by_time API +add_executable(test_get_by_time test_get_by_time.cpp) +target_include_directories(test_get_by_time PRIVATE + $ + $ + $ + $ +) +target_link_libraries(test_get_by_time cascade pthread) + +# Test for put_by_time API +add_executable(test_put_by_time test_put_by_time.cpp) +target_include_directories(test_put_by_time PRIVATE + $ + $ + $ +) +target_link_libraries(test_put_by_time cascade) + +add_custom_command(TARGET test_put_by_time POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_directory ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/derecho.cfg + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n0/derecho.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/derecho.cfg + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n1/derecho.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/derecho.cfg + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n2/derecho.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/derecho.cfg + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n3/derecho.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n0/layout.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n1/layout.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n2/layout.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n3/layout.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n0/dfgs.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n1/dfgs.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n2/dfgs.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n3/dfgs.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n0/udl_dlls.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n1/udl_dlls.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n2/udl_dlls.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n3/udl_dlls.cfg + DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + COMMENT "prepare consistency tests configuration" +) diff --git a/src/applications/tests/temporal_consistency_test/consistency_test_cfg/derecho.cfg b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/derecho.cfg new file mode 100644 index 00000000..8c3b7c26 --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/derecho.cfg @@ -0,0 +1,98 @@ +[DERECHO] +contact_ip = 127.0.0.1 +contact_port = 23580 +local_ip = 127.0.0.1 +heartbeat_ms = 100 +sst_poll_cq_timeout_ms = 100 +disable_partitioning_safety = false + +max_p2p_request_payload_size = 1048576 +max_p2p_reply_payload_size = 10240 +p2p_window_size = 4 + +[SUBGROUP/DEFAULT] +max_payload_size = 1048576 +max_reply_payload_size = 10240 +max_smc_payload_size = 10240 +block_size = 1048576 +window_size = 16 +rdmc_send_algorithm = binomial_send + +[SUBGROUP/LARGE] +max_payload_size = 102400 +max_reply_payload_size = 102400 +max_smc_payload_size = 10240 +block_size = 10240 +window_size = 3 +rdmc_send_algorithm = binomial_send + +[SUBGROUP/SMALL] +max_payload_size = 100 +max_reply_payload_size = 100 +max_smc_payload_size = 100 +block_size = 0 +window_size = 50 +rdmc_send_algorithm = binomial_send + +[SUBGROUP/VCS] +max_payload_size = 1048576 +max_reply_payload_size = 8192 +max_smc_payload_size = 10240 +block_size = 1048576 +window_size = 8 +rdmc_send_algorithm = binomial_send +num_shards = 1 +min_nodes = 1 +max_nodes = 4 + +[SUBGROUP/PCS] +max_payload_size = 8192 +max_reply_payload_size = 8192 +max_smc_payload_size = 10240 +block_size = 1048576 +window_size = 50 +rdmc_send_algorithm = binomial_send +num_shards = 1 +min_nodes = 1 +max_nodes = 4 + +[RDMA] +provider = tcp +domain = lo +tx_depth = 256 +rx_depth = 256 + +[PERS] +file_path = .plog +ramdisk_path = /dev/shm/volatile_t +reset = false +max_log_entry = 1048576 +max_data_size = 549755813888 +server_clock_skew_delta_us = 1000 +temporal_consistency_delta_us = 1000000 + +[LOGGER] +default_log_name = derecho_debug +default_log_level = off + +[LAYOUT] +json_layout_file = 'layout.json' + +[CASCADE] +cpu_cores = 0-3 +num_stateless_workers_for_multicast_ocdp = 2 +num_stateless_workers_for_p2p_ocdp = 2 +num_stateful_workers_for_multicast_ocdp = 2 +num_stateful_workers_for_p2p_ocdp = 2 +worker_cpu_affinity = ' +{ + "multicast_ocdp" : { + "0":"0", + "1":"1" + }, + "p2p_ocdp" : { + "0":"2", + "1":"3" + } +} +' diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/dfgs.json.tmp b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/dfgs.json.tmp similarity index 100% rename from src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/dfgs.json.tmp rename to src/applications/tests/temporal_consistency_test/consistency_test_cfg/dfgs.json.tmp diff --git a/src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/layout.json.tmp b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/layout.json.tmp similarity index 100% rename from src/applications/tests/cascade_as_subgroup_classes/consistency_test_cfg/layout.json.tmp rename to src/applications/tests/temporal_consistency_test/consistency_test_cfg/layout.json.tmp diff --git a/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n0/derecho_node.cfg b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n0/derecho_node.cfg new file mode 100644 index 00000000..237d0991 --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n0/derecho_node.cfg @@ -0,0 +1,7 @@ +[DERECHO] +local_id = 0 +gms_port = 23580 +state_transfer_port = 28366 +sst_port = 37683 +rdmc_port = 31675 +external_port = 32645 diff --git a/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n1/derecho_node.cfg b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n1/derecho_node.cfg new file mode 100644 index 00000000..09fa4c3e --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n1/derecho_node.cfg @@ -0,0 +1,7 @@ +[DERECHO] +local_id = 1 +gms_port = 23581 +state_transfer_port = 28367 +sst_port = 37684 +rdmc_port = 31674 +external_port = 32646 diff --git a/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n2/derecho_node.cfg b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n2/derecho_node.cfg new file mode 100644 index 00000000..9a5192e2 --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n2/derecho_node.cfg @@ -0,0 +1,7 @@ +[DERECHO] +local_id = 2 +gms_port = 23582 +state_transfer_port = 28368 +sst_port = 37685 +rdmc_port = 31677 +external_port = 32647 diff --git a/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n3/derecho_node.cfg b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n3/derecho_node.cfg new file mode 100644 index 00000000..335be927 --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n3/derecho_node.cfg @@ -0,0 +1,7 @@ +[DERECHO] +local_id = 3 +gms_port = 23583 +state_transfer_port = 28369 +sst_port = 37686 +rdmc_port = 31678 +external_port = 32648 diff --git a/src/applications/tests/temporal_consistency_test/consistency_test_cfg/udl_dlls.cfg.tmp b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/udl_dlls.cfg.tmp new file mode 100644 index 00000000..e69de29b diff --git a/src/applications/tests/cascade_as_subgroup_classes/test_get_by_time.cpp b/src/applications/tests/temporal_consistency_test/test_get_by_time.cpp similarity index 99% rename from src/applications/tests/cascade_as_subgroup_classes/test_get_by_time.cpp rename to src/applications/tests/temporal_consistency_test/test_get_by_time.cpp index bc18080e..2b7918a8 100644 --- a/src/applications/tests/cascade_as_subgroup_classes/test_get_by_time.cpp +++ b/src/applications/tests/temporal_consistency_test/test_get_by_time.cpp @@ -61,7 +61,6 @@ int main(int argc, char** argv) { // Use current time for each put uint64_t timestamp_us = get_walltime() / 1000ULL; - std::cout << " Putting key: " << obj.key << " at timestamp: " << timestamp_us << std::endl; auto result = capi.template put_by_time( obj, timestamp_us, subgroup_index, shard_index, false); @@ -71,6 +70,7 @@ int main(int argc, char** argv) { auto reply = reply_future.second.get(); uint64_t actual_timestamp = std::get<1>(reply); test_data.push_back({obj.key, actual_timestamp}); + std::cout << " Put key: " << obj.key << " at timestamp: " << timestamp_us << std::endl; std::cout << " ✓ Put successful. Version: " << std::get<0>(reply) << ", Timestamp: " << actual_timestamp << std::endl; } diff --git a/src/applications/tests/cascade_as_subgroup_classes/test_put_by_time.cpp b/src/applications/tests/temporal_consistency_test/test_put_by_time.cpp similarity index 93% rename from src/applications/tests/cascade_as_subgroup_classes/test_put_by_time.cpp rename to src/applications/tests/temporal_consistency_test/test_put_by_time.cpp index 24895f26..e4535fac 100644 --- a/src/applications/tests/cascade_as_subgroup_classes/test_put_by_time.cpp +++ b/src/applications/tests/temporal_consistency_test/test_put_by_time.cpp @@ -42,8 +42,6 @@ int main(int argc, char** argv) { // Get current time in microseconds uint64_t current_time_us = get_walltime() / 1000ULL; - std::cout << " Current time: " << current_time_us << " microseconds" << std::endl; - std::cout << " Calling put_by_time..." << std::endl; auto result = capi.template put_by_time(obj, current_time_us, subgroup_index, shard_index, false); @@ -53,6 +51,7 @@ int main(int argc, char** argv) { std::cout << " ✓ Success! Version: " << std::get<0>(reply) << ", Timestamp: " << std::get<1>(reply) << " microseconds" << std::endl; } + std::cout << " Testing Current time: " << current_time_us << " microseconds" << std::endl; } std::cout << std::endl; @@ -66,12 +65,9 @@ int main(int argc, char** argv) { obj.previous_version_by_key = persistent::INVALID_VERSION; // Use a timestamp 500ms in the future (within 1 second delta) + // Calculate and call immediately to avoid terminal output delays consuming the margin uint64_t current_time_us = get_walltime() / 1000ULL; uint64_t future_time_us = current_time_us + 500000ULL; // 500ms = 500000 microseconds - std::cout << " Current time: " << current_time_us << " microseconds" << std::endl; - std::cout << " Future time: " << future_time_us << " microseconds" << std::endl; - std::cout << " Calling put_by_time..." << std::endl; - auto result = capi.template put_by_time(obj, future_time_us, subgroup_index, shard_index, false); // Wait for result @@ -80,6 +76,10 @@ int main(int argc, char** argv) { std::cout << " ✓ Success! Version: " << std::get<0>(reply) << ", Timestamp: " << std::get<1>(reply) << " microseconds" << std::endl; } + + // Print diagnostic info after the call + std::cout << " Testing Current time (at call): " << current_time_us << " microseconds" << std::endl; + std::cout << " Testing Future time: " << future_time_us << " microseconds" << std::endl; } std::cout << std::endl; @@ -95,10 +95,6 @@ int main(int argc, char** argv) { // Use a timestamp 2 seconds in the past (older than 1 second delta) uint64_t current_time_us = get_walltime() / 1000ULL; uint64_t old_time_us = current_time_us - 2000000ULL; // 2 seconds = 2000000 microseconds - std::cout << " Current time: " << current_time_us << " microseconds" << std::endl; - std::cout << " Old time: " << old_time_us << " microseconds" << std::endl; - std::cout << " Calling put_by_time (should be rejected)..." << std::endl; - try { auto result = capi.template put_by_time(obj, old_time_us, subgroup_index, shard_index, false); // If we get here, the call didn't throw - check if result is empty @@ -115,6 +111,8 @@ int main(int argc, char** argv) { } catch (const derecho::derecho_exception& e) { std::cout << " ✓ Correctly rejected with exception: " << e.what() << std::endl; } + std::cout << " Testing Current time: " << current_time_us << " microseconds" << std::endl; + std::cout << " Old time: " << old_time_us << " microseconds" << std::endl; } std::cout << std::endl; @@ -150,8 +148,6 @@ int main(int argc, char** argv) { // Use a timestamp that's guaranteed to be higher than the current HLC // (regular_put_timestamp + 100ms to be safe) uint64_t custom_timestamp_us = regular_put_timestamp + 100000ULL; // 100ms ahead of regular put - std::cout << " put_by_time with custom timestamp: " << custom_timestamp_us << " microseconds" << std::endl; - std::cout << " (100ms ahead of regular put to ensure HLC advances)" << std::endl; auto result2 = capi.template put_by_time(obj2, custom_timestamp_us, subgroup_index, shard_index, false); uint64_t custom_put_timestamp = 0; for (auto& reply_future : result2.get()) { @@ -167,6 +163,8 @@ int main(int argc, char** argv) { std::cout << " ✗ Error: Custom timestamp mismatch. Expected: " << custom_timestamp_us << ", Got: " << custom_put_timestamp << std::endl; } + std::cout << " Testing put_by_time with custom timestamp: " << custom_timestamp_us << " microseconds" << std::endl; + std::cout << " (100ms ahead of regular put to ensure HLC advances)" << std::endl; } std::cout << std::endl; @@ -181,9 +179,6 @@ int main(int argc, char** argv) { // Use a specific timestamp (e.g., 1000000000 microseconds = 1000 seconds) uint64_t specific_timestamp_us = 1000000000ULL; - std::cout << " Using specific timestamp: " << specific_timestamp_us << " microseconds" << std::endl; - std::cout << " Note: This will be rejected if it's too old" << std::endl; - try { auto result = capi.template put_by_time(obj, specific_timestamp_us, subgroup_index, shard_index, false); bool has_results = false; @@ -199,6 +194,8 @@ int main(int argc, char** argv) { } catch (const derecho::derecho_exception& e) { std::cout << " Timestamp was rejected with exception: " << e.what() << std::endl; } + std::cout << " Using specific timestamp: " << specific_timestamp_us << " microseconds" << std::endl; + std::cout << " Note: This will be rejected if it's too old" << std::endl; } std::cout << std::endl; From e52a966efc452543b849505828eab76c29d9ceea Mon Sep 17 00:00:00 2001 From: aliciayuting Date: Sun, 18 Jan 2026 17:14:52 -0500 Subject: [PATCH 5/5] added Epsilon to account for client to server delay --- .../cascade/detail/persistent_store_impl.hpp | 19 +++++++++---------- .../cascade/detail/service_client_impl.hpp | 2 +- .../cascade/detail/volatile_store_impl.hpp | 19 +++++++++---------- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/include/cascade/detail/persistent_store_impl.hpp b/include/cascade/detail/persistent_store_impl.hpp index 57990daf..e4d1c315 100644 --- a/include/cascade/detail/persistent_store_impl.hpp +++ b/include/cascade/detail/persistent_store_impl.hpp @@ -46,16 +46,15 @@ template version_tuple PersistentCascadeStore::put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const { debug_enter_func_with_args("value.get_key_ref()={}, timestamp_us={}", value.get_key_ref(), timestamp_us); - // TODO: decide if we want to enable server side timestamp validation and what values to set - // uint64_t now_us = get_walltime() / 1000ULL; // Convert nanoseconds to microseconds - // uint64_t temporal_consistency_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_TEMPORAL_CONSISTENCY_DELTA_US); - // uint64_t server_clock_skew_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US); - // uint64_t threshold_us = now_us - temporal_consistency_delta_us - server_clock_skew_delta_us; - // if (timestamp_us < threshold_us) { - // dbg_default_warn("put_with_timestamp: rejecting timestamp {} us (threshold: {} us, now: {} us)", - // timestamp_us, threshold_us, now_us); - // throw derecho::derecho_exception("put_with_timestamp: timestamp is too old (older than now - temporal_consistency_delta - server_clock_skew_delta)"); - // } + uint64_t now_us = get_walltime() / 1000ULL; // Convert nanoseconds to microseconds + uint64_t client_server_epsilon_us = derecho::getConfUInt64(derecho::Conf::PERS_CLIENT_SERVER_EPSILON_US); + uint64_t server_clock_skew_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US); + uint64_t threshold_us = now_us - client_server_epsilon_us - server_clock_skew_delta_us; + if (timestamp_us < threshold_us) { + dbg_default_warn("put_with_timestamp: rejecting timestamp {} us (threshold: {} us, now: {} us)", + timestamp_us, threshold_us, now_us); + throw derecho::derecho_exception("put_with_timestamp: timestamp is too old (older than now - client_server_epsilon - server_clock_skew_delta)"); + } LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_PUT_START, group, value); diff --git a/include/cascade/detail/service_client_impl.hpp b/include/cascade/detail/service_client_impl.hpp index fda5d10e..560fe1f2 100644 --- a/include/cascade/detail/service_client_impl.hpp +++ b/include/cascade/detail/service_client_impl.hpp @@ -766,7 +766,7 @@ derecho::rpc::QueryResults ServiceClient::put_by const ObjectType& value, const uint64_t& timestamp_us, bool as_trigger) { // STEP 1 - get key if constexpr (!std::is_base_of_v,ObjectType>) { - throw derecho::derecho_exception(std::string("ServiceClient<>::put_by_time() only support object of type ICascadeObject,but we get ") + typeid(ObjectType).name()); + throw derecho::derecho_exception(std::string("ServiceClient<>::put_by_time() only support object of type ICascadeObject,but we got ") + typeid(ObjectType).name()); } // STEP 2 - validate timestamp: reject if timestamp_us < get_walltime() - Delta diff --git a/include/cascade/detail/volatile_store_impl.hpp b/include/cascade/detail/volatile_store_impl.hpp index 4dfbb024..acbaf704 100644 --- a/include/cascade/detail/volatile_store_impl.hpp +++ b/include/cascade/detail/volatile_store_impl.hpp @@ -39,16 +39,15 @@ template version_tuple VolatileCascadeStore::put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const { debug_enter_func_with_args("value.get_key_ref={}, timestamp_us={}", value.get_key_ref(), timestamp_us); - // TODO: decide if we want to enable server side timestamp validation and what values to set - // uint64_t now_us = get_walltime() / 1000ULL; // Convert nanoseconds to microseconds - // uint64_t temporal_consistency_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_TEMPORAL_CONSISTENCY_DELTA_US); - // uint64_t server_clock_skew_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US); - // uint64_t threshold_us = now_us - temporal_consistency_delta_us - server_clock_skew_delta_us; - // if (timestamp_us < threshold_us) { - // dbg_default_warn("put_with_timestamp: rejecting timestamp {} us (threshold: {} us, now: {} us)", - // timestamp_us, threshold_us, now_us); - // throw derecho::derecho_exception("put_with_timestamp: timestamp is too old (older than now - temporal_consistency_delta - server_clock_skew_delta)"); - // } + uint64_t now_us = get_walltime() / 1000ULL; // Convert nanoseconds to microseconds + uint64_t client_server_epsilon_us = derecho::getConfUInt64(derecho::Conf::PERS_CLIENT_SERVER_EPSILON_US); + uint64_t server_clock_skew_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US); + uint64_t threshold_us = now_us - client_server_epsilon_us - server_clock_skew_delta_us; + if (timestamp_us < threshold_us) { + dbg_default_warn("put_with_timestamp: rejecting timestamp {} us (threshold: {} us, now: {} us)", + timestamp_us, threshold_us, now_us); + throw derecho::derecho_exception("put_with_timestamp: timestamp is too old (older than now - client_server_epsilon - server_clock_skew_delta)"); + } LOG_TIMESTAMP_BY_TAG(TLT_VOLATILE_PUT_START, group, value);