diff --git a/include/cascade/detail/persistent_store_impl.hpp b/include/cascade/detail/persistent_store_impl.hpp index 10355312..e4d1c315 100644 --- a/include/cascade/detail/persistent_store_impl.hpp +++ b/include/cascade/detail/persistent_store_impl.hpp @@ -42,6 +42,35 @@ version_tuple PersistentCascadeStore::put(const VT& value, b return ret; } +template +version_tuple PersistentCascadeStore::put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const { + debug_enter_func_with_args("value.get_key_ref()={}, timestamp_us={}", value.get_key_ref(), timestamp_us); + + uint64_t now_us = get_walltime() / 1000ULL; // Convert nanoseconds to microseconds + uint64_t client_server_epsilon_us = derecho::getConfUInt64(derecho::Conf::PERS_CLIENT_SERVER_EPSILON_US); + uint64_t server_clock_skew_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US); + uint64_t threshold_us = now_us - client_server_epsilon_us - server_clock_skew_delta_us; + if (timestamp_us < threshold_us) { + dbg_default_warn("put_with_timestamp: rejecting timestamp {} us (threshold: {} us, now: {} us)", + timestamp_us, threshold_us, now_us); + throw derecho::derecho_exception("put_with_timestamp: timestamp is too old (older than now - client_server_epsilon - server_clock_skew_delta)"); + } + + LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_PUT_START, group, value); + + derecho::Replicated& subgroup_handle = group->template get_subgroup(this->subgroup_index); + auto results = subgroup_handle.template ordered_send_with_timestamp(timestamp_us, value, as_trigger); + auto& replies = results.get(); + version_tuple ret{CURRENT_VERSION, 0}; + for(auto& reply_pair : replies) { + ret = reply_pair.second.get(); + } + + LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_PUT_END, group, value); + debug_leave_func_with_value("version=0x{:x},timestamp={}us", std::get<0>(ret), std::get<1>(ret)); + return ret; +} + template void PersistentCascadeStore::put_and_forget(const VT& value, bool as_trigger) const { debug_enter_func_with_args("value.get_key_ref()={}", value.get_key_ref()); diff --git a/include/cascade/detail/service_client_impl.hpp b/include/cascade/detail/service_client_impl.hpp index 4c0269e0..560fe1f2 100644 --- a/include/cascade/detail/service_client_impl.hpp +++ b/include/cascade/detail/service_client_impl.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -64,7 +65,8 @@ std::unique_ptr client_stub_factory() { template ServiceClient::ServiceClient(derecho::Group, CascadeTypes...>* _group_ptr) : external_group_ptr(nullptr), - group_ptr(_group_ptr) { + group_ptr(_group_ptr), + server_clock_skew_delta_us(derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US)) { if(group_ptr == nullptr) { this->external_group_ptr = std::make_unique, CascadeTypes...>>( client_stub_factory>, @@ -758,6 +760,110 @@ void ServiceClient::collective_trigger_put( } } +template +template +derecho::rpc::QueryResults ServiceClient::put_by_time( + const ObjectType& value, const uint64_t& timestamp_us, bool as_trigger) { + // STEP 1 - get key + if constexpr (!std::is_base_of_v,ObjectType>) { + throw derecho::derecho_exception(std::string("ServiceClient<>::put_by_time() only support object of type ICascadeObject,but we got ") + typeid(ObjectType).name()); + } + + // STEP 2 - validate timestamp: reject if timestamp_us < get_walltime() - Delta + uint64_t now_us = get_walltime() / 1000ULL; // Convert nanoseconds to microseconds + uint64_t delta_us = server_clock_skew_delta_us / 1000ULL; // Convert nanoseconds to microseconds + + if (timestamp_us < (now_us - delta_us)) { + dbg_default_warn("put_by_time: timestamp {}us is too old (now={}us, delta={}us), rejecting", + timestamp_us, now_us, delta_us); + throw derecho::derecho_exception("put_by_time: timestamp is too old (older than now - delta)"); + } + + // STEP 3 - get shard + uint32_t subgroup_type_index,subgroup_index,shard_index; + std::tie(subgroup_type_index,subgroup_index,shard_index) = this->template key_to_shard(value.get_key_ref()); + + // STEP 4 - call recursive put_by_time + return this->template type_recursive_put_by_time(subgroup_type_index,value,timestamp_us,subgroup_index,shard_index,as_trigger); +} + +template +template +derecho::rpc::QueryResults ServiceClient::put_by_time( + const typename SubgroupType::ObjectType& value, + const uint64_t& timestamp_us, + uint32_t subgroup_index, + uint32_t shard_index, + bool as_trigger) { + LOG_SERVICE_CLIENT_TIMESTAMP(TLT_SERVICE_CLIENT_PUT_BY_TIME_START, + (std::is_base_of::value?value.get_message_id():0)); + // Validate timestamp: timestamp_us must be >= (now - delta) + uint64_t now_ns = get_walltime(); + uint64_t timestamp_ns = timestamp_us * 1000ULL; // Convert microseconds to nanoseconds + uint64_t delta_ns = server_clock_skew_delta_us * 1000ULL; + + if (timestamp_ns < now_ns - delta_ns) { + // Timestamp is too old, reject the request + throw derecho::derecho_exception("put_by_time: timestamp is too old (older than now - delta)"); + } + if (!is_external_client()) { + std::lock_guard lck(this->group_ptr_mutex); + if (static_cast(group_ptr->template get_my_shard(subgroup_index)) == shard_index) { + // ordered put_by_time as a shard member - use ordered_send with timestamp + auto& subgroup_handle = group_ptr->template get_subgroup(subgroup_index); + return subgroup_handle.template ordered_send_with_timestamp(timestamp_us, value, as_trigger); + } else { + // p2p put_with_timestamp - send to a shard member who will do ordered_send_with_timestamp + node_id_t node_id = pick_member_by_policy(subgroup_index,shard_index,value.get_key_ref()); + try { + auto& subgroup_handle = group_ptr->template get_subgroup(subgroup_index); + return subgroup_handle.template p2p_send(node_id,value,timestamp_us,as_trigger); + } catch (derecho::invalid_subgroup_exception& ex) { + auto& subgroup_handle = group_ptr->template get_nonmember_subgroup(subgroup_index); + return subgroup_handle.template p2p_send(node_id,value,timestamp_us,as_trigger); + } + } + } else { + // External client - use put_with_timestamp P2P call + std::lock_guard lck(this->external_group_ptr_mutex); + auto& caller = external_group_ptr->template get_subgroup_caller(subgroup_index); + node_id_t node_id = pick_member_by_policy(subgroup_index,shard_index,value.get_key_ref()); + return caller.template p2p_send(node_id,value,timestamp_us,as_trigger); + } +} + +template +template +derecho::rpc::QueryResults ServiceClient::type_recursive_put_by_time( + uint32_t type_index, + const ObjectType& object, + const uint64_t& timestamp_us, + uint32_t subgroup_index, + uint32_t shard_index, + bool as_trigger) { + if (type_index == 0) { + return this->template put_by_time(object, timestamp_us, subgroup_index, shard_index, as_trigger); + } else { + return this->template type_recursive_put_by_time(type_index-1,object,timestamp_us,subgroup_index,shard_index,as_trigger); + } +} + +template +template +derecho::rpc::QueryResults ServiceClient::type_recursive_put_by_time( + uint32_t type_index, + const ObjectType& object, + const uint64_t& timestamp_us, + uint32_t subgroup_index, + uint32_t shard_index, + bool as_trigger) { + if (type_index == 0) { + return this->template put_by_time(object, timestamp_us, subgroup_index, shard_index, as_trigger); + } else { + throw derecho::derecho_exception(std::string(__PRETTY_FUNCTION__) + ": type index is out of boundary."); + } +} + template template derecho::rpc::QueryResults ServiceClient::remove( diff --git a/include/cascade/detail/trigger_store_impl.hpp b/include/cascade/detail/trigger_store_impl.hpp index 0f85aaf0..0dc5d59a 100644 --- a/include/cascade/detail/trigger_store_impl.hpp +++ b/include/cascade/detail/trigger_store_impl.hpp @@ -23,6 +23,12 @@ version_tuple TriggerCascadeNoStore::put(const VT& value, bool a return {persistent::INVALID_VERSION, 0}; } +template +version_tuple TriggerCascadeNoStore::put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const { + dbg_default_warn("Calling unsupported func:{}", __PRETTY_FUNCTION__); + return {persistent::INVALID_VERSION, 0}; +} + template void TriggerCascadeNoStore::put_and_forget(const VT& value, bool as_trigger) const { dbg_default_warn("Calling unsupported func:{}", __PRETTY_FUNCTION__); diff --git a/include/cascade/detail/volatile_store_impl.hpp b/include/cascade/detail/volatile_store_impl.hpp index 69e90c8a..acbaf704 100644 --- a/include/cascade/detail/volatile_store_impl.hpp +++ b/include/cascade/detail/volatile_store_impl.hpp @@ -35,6 +35,35 @@ version_tuple VolatileCascadeStore::put(const VT& value, bool as return ret; } +template +version_tuple VolatileCascadeStore::put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const { + debug_enter_func_with_args("value.get_key_ref={}, timestamp_us={}", value.get_key_ref(), timestamp_us); + + uint64_t now_us = get_walltime() / 1000ULL; // Convert nanoseconds to microseconds + uint64_t client_server_epsilon_us = derecho::getConfUInt64(derecho::Conf::PERS_CLIENT_SERVER_EPSILON_US); + uint64_t server_clock_skew_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US); + uint64_t threshold_us = now_us - client_server_epsilon_us - server_clock_skew_delta_us; + if (timestamp_us < threshold_us) { + dbg_default_warn("put_with_timestamp: rejecting timestamp {} us (threshold: {} us, now: {} us)", + timestamp_us, threshold_us, now_us); + throw derecho::derecho_exception("put_with_timestamp: timestamp is too old (older than now - client_server_epsilon - server_clock_skew_delta)"); + } + + LOG_TIMESTAMP_BY_TAG(TLT_VOLATILE_PUT_START, group, value); + + derecho::Replicated& subgroup_handle = group->template get_subgroup(this->subgroup_index); + auto results = subgroup_handle.template ordered_send_with_timestamp(timestamp_us, value, as_trigger); + auto& replies = results.get(); + version_tuple ret{CURRENT_VERSION, 0}; + for(auto& reply_pair : replies) { + ret = reply_pair.second.get(); + } + + LOG_TIMESTAMP_BY_TAG(TLT_VOLATILE_PUT_END, group, value); + debug_leave_func_with_value("version=0x{:x},timestamp={}us", std::get<0>(ret), std::get<1>(ret)); + return ret; +} + template void VolatileCascadeStore::put_and_forget(const VT& value, bool as_trigger) const { debug_enter_func_with_args("value.get_key_ref={}", value.get_key_ref()); diff --git a/include/cascade/persistent_store.hpp b/include/cascade/persistent_store.hpp index a32704c6..4e7350ff 100644 --- a/include/cascade/persistent_store.hpp +++ b/include/cascade/persistent_store.hpp @@ -42,6 +42,7 @@ class PersistentCascadeStore : public ICascadeStore, REGISTER_RPC_FUNCTIONS_WITH_NOTIFICATION(PersistentCascadeStore, P2P_TARGETS( put, + put_with_timestamp, put_and_forget, #ifdef ENABLE_EVALUATION perf_put, @@ -86,6 +87,7 @@ class PersistentCascadeStore : public ICascadeStore, #endif // ENABLE_EVALUATION virtual void trigger_put(const VT& value) const override; virtual version_tuple put(const VT& value, bool as_trigger) const override; + virtual version_tuple put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const; virtual void put_and_forget(const VT& value, bool as_trigger) const override; #ifdef ENABLE_EVALUATION virtual double perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const override; diff --git a/include/cascade/service_client.hpp b/include/cascade/service_client.hpp index ce6e015a..8da56664 100644 --- a/include/cascade/service_client.hpp +++ b/include/cascade/service_client.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -118,6 +119,8 @@ class ServiceClient { do_hash>> member_cache; mutable std::shared_mutex member_cache_mutex; + // config clock skew delta (in microseconds) for time consistency + const uint64_t server_clock_skew_delta_us; /** * 'object_pool_info_cache' is a local cache for object pool metadata. This cache is used to accelerate the * object access process. If an object pool does not exists, it will be loaded from metadata service. @@ -519,7 +522,66 @@ class ServiceClient { void collective_trigger_put(const typename SubgroupType::ObjectType& object, uint32_t subgroup_index, std::unordered_map>>& nodes_and_futures); + /** + * "put_by_time" writes an object to a given subgroup/shard with a custom timestamp. + * + * @param[in] object the object to write. + * @param[in] timestamp_us the timestamp in microseconds to use for the message header. + * The request will be rejected if timestamp_us < get_walltime() - Delta. + * @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be + * used to update the state. + */ + template + derecho::rpc::QueryResults put_by_time(const ObjectType& object, const uint64_t& timestamp_us, bool as_trigger = false); + /** + * "put_by_time" writes an object to a given subgroup/shard with a custom timestamp. + * + * @param[in] object the object to write. + * @param[in] timestamp_us the timestamp in microseconds to use for the message header. + * The request will be rejected if timestamp_us < get_walltime() - Delta. + * @param[in] subgroup_index the subgroup index of CascadeType + * @param[in] shard_index the shard index. + * @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be + * used to update the state. + */ + template + derecho::rpc::QueryResults put_by_time(const typename SubgroupType::ObjectType& object, + const uint64_t& timestamp_us, uint32_t subgroup_index, uint32_t shard_index, bool as_trigger = false); + +protected: + /** + * "type_recursive_put" is a helper function for internal use only. + * @param[in] type_index the index of the subgroup type in the CascadeTypes... list. And the FirstType, + * SecondType, ..., RestTypes should be in the same order. + * @param[in] object the object to write + * @param[in] timestamp_us the timestamp in microseconds to use for the message header. + * @param[in] subgroup_index + * the subgroup index in the subgroup type designated by type_index + * @param[in] shard_index the shard index + * @param[in] as_trigger If true, the object will NOT apply to the K/V store. The object will only be + * used to update the state. + * + * @return a future to the version and timestamp of the put operation. + */ + template + derecho::rpc::QueryResults type_recursive_put_by_time( + uint32_t type_index, + const ObjectType& object, + const uint64_t& timestamp_us, + uint32_t subgroup_index, + uint32_t shard_index, + bool as_trigger = false); + + template + derecho::rpc::QueryResults type_recursive_put_by_time( + uint32_t type_index, + const ObjectType& object, + const uint64_t& timestamp_us, + uint32_t subgroup_index, + uint32_t shard_index, + bool as_trigger = false); +public: /** * "remove" deletes an object with the given key. * diff --git a/include/cascade/trigger_store.hpp b/include/cascade/trigger_store.hpp index 95fdb921..d8cab056 100644 --- a/include/cascade/trigger_store.hpp +++ b/include/cascade/trigger_store.hpp @@ -37,6 +37,7 @@ class TriggerCascadeNoStore : public ICascadeStore, REGISTER_RPC_FUNCTIONS_WITH_NOTIFICATION(TriggerCascadeNoStore, P2P_TARGETS( put, + put_with_timestamp, put_and_forget, #ifdef ENABLE_EVALUATION perf_put, @@ -81,6 +82,7 @@ class TriggerCascadeNoStore : public ICascadeStore, #endif // ENABLE_EVALUATION virtual void trigger_put(const VT& value) const override; virtual version_tuple put(const VT& value, bool as_trigger) const override; + virtual version_tuple put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const; virtual void put_and_forget(const VT& value, bool as_trigger) const override; #ifdef ENABLE_EVALUATION virtual double perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const override; diff --git a/include/cascade/utils.hpp b/include/cascade/utils.hpp index a39f70a2..8f2a5c1c 100644 --- a/include/cascade/utils.hpp +++ b/include/cascade/utils.hpp @@ -35,7 +35,6 @@ inline uint64_t get_time_us(bool use_wall_clock = true) { return get_time_ns(use_wall_clock)/INT64_1E3; } - /** * decompose the prefix into tokens. Please note that the token after the last separator is not considered a part of * the prefix and hence dropped if the "prefix_only" is true @@ -204,6 +203,7 @@ class OpenLoopLatencyCollector: public OpenLoopLatencyCollectorClient { #define TLT_SERVICE_CLIENT_MULTI_LIST_KEYS_START (1009) #define TLT_SERVICE_CLIENT_GET_SIZE_START (1010) #define TLT_SERVICE_CLIENT_MULTI_GET_SIZE_START (1011) +#define TLT_SERVICE_CLIENT_PUT_BY_TIME_START (1012) /* For VolatileCascadeStore: * ::put(): diff --git a/include/cascade/volatile_store.hpp b/include/cascade/volatile_store.hpp index 6f2d166f..ffcdd5e1 100644 --- a/include/cascade/volatile_store.hpp +++ b/include/cascade/volatile_store.hpp @@ -47,6 +47,7 @@ class VolatileCascadeStore : public ICascadeStore, REGISTER_RPC_FUNCTIONS_WITH_NOTIFICATION(VolatileCascadeStore, P2P_TARGETS( put, + put_with_timestamp, put_and_forget, #ifdef ENABLE_EVALUATION perf_put, @@ -91,6 +92,7 @@ class VolatileCascadeStore : public ICascadeStore, #endif // ENABLE_EVALUATION virtual void trigger_put(const VT& value) const override; virtual version_tuple put(const VT& value, bool as_trigger) const override; + virtual version_tuple put_with_timestamp(const VT& value, uint64_t timestamp_us, bool as_trigger) const; #ifdef ENABLE_EVALUATION virtual double perf_put(const uint32_t max_payload_size, const uint64_t duration_sec) const override; #endif // ENABLE_EVALUATION diff --git a/src/applications/tests/CMakeLists.txt b/src/applications/tests/CMakeLists.txt index 9d04a2ef..b1e5ec45 100644 --- a/src/applications/tests/CMakeLists.txt +++ b/src/applications/tests/CMakeLists.txt @@ -2,3 +2,4 @@ add_subdirectory(unit_tests) add_subdirectory(cascade_as_subgroup_classes) add_subdirectory(user_defined_logic) add_subdirectory(pipeline) +add_subdirectory(temporal_consistency_test) \ No newline at end of file diff --git a/src/applications/tests/cascade_as_subgroup_classes/CMakeLists.txt b/src/applications/tests/cascade_as_subgroup_classes/CMakeLists.txt index 70004723..56decc9c 100644 --- a/src/applications/tests/cascade_as_subgroup_classes/CMakeLists.txt +++ b/src/applications/tests/cascade_as_subgroup_classes/CMakeLists.txt @@ -22,4 +22,4 @@ add_custom_command(TARGET cli_example POST_BUILD ${CMAKE_CURRENT_BINARY_DIR}/cli_example_cfg COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/perf_shutdown.py ${CMAKE_CURRENT_BINARY_DIR}/perf_shutdown.py -) +) \ No newline at end of file diff --git a/src/applications/tests/temporal_consistency_test/CMakeLists.txt b/src/applications/tests/temporal_consistency_test/CMakeLists.txt new file mode 100644 index 00000000..25496e6d --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/CMakeLists.txt @@ -0,0 +1,59 @@ +# Test for get_by_time API +add_executable(test_get_by_time test_get_by_time.cpp) +target_include_directories(test_get_by_time PRIVATE + $ + $ + $ + $ +) +target_link_libraries(test_get_by_time cascade pthread) + +# Test for put_by_time API +add_executable(test_put_by_time test_put_by_time.cpp) +target_include_directories(test_put_by_time PRIVATE + $ + $ + $ +) +target_link_libraries(test_put_by_time cascade) + +add_custom_command(TARGET test_put_by_time POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_directory ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/derecho.cfg + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n0/derecho.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/derecho.cfg + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n1/derecho.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/derecho.cfg + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n2/derecho.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/derecho.cfg + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n3/derecho.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n0/layout.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n1/layout.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n2/layout.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n3/layout.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n0/dfgs.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n1/dfgs.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n2/dfgs.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n3/dfgs.json + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n0/udl_dlls.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n1/udl_dlls.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n2/udl_dlls.cfg + COMMAND ln -sf ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + ${CMAKE_CURRENT_BINARY_DIR}/consistency_test_cfg/n3/udl_dlls.cfg + DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg/layout.json.tmp + ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg/dfgs.json.tmp + ${CMAKE_CURRENT_SOURCE_DIR}/consistency_test_cfg/udl_dlls.cfg.tmp + COMMENT "prepare consistency tests configuration" +) diff --git a/src/applications/tests/temporal_consistency_test/consistency_test_cfg/derecho.cfg b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/derecho.cfg new file mode 100644 index 00000000..8c3b7c26 --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/derecho.cfg @@ -0,0 +1,98 @@ +[DERECHO] +contact_ip = 127.0.0.1 +contact_port = 23580 +local_ip = 127.0.0.1 +heartbeat_ms = 100 +sst_poll_cq_timeout_ms = 100 +disable_partitioning_safety = false + +max_p2p_request_payload_size = 1048576 +max_p2p_reply_payload_size = 10240 +p2p_window_size = 4 + +[SUBGROUP/DEFAULT] +max_payload_size = 1048576 +max_reply_payload_size = 10240 +max_smc_payload_size = 10240 +block_size = 1048576 +window_size = 16 +rdmc_send_algorithm = binomial_send + +[SUBGROUP/LARGE] +max_payload_size = 102400 +max_reply_payload_size = 102400 +max_smc_payload_size = 10240 +block_size = 10240 +window_size = 3 +rdmc_send_algorithm = binomial_send + +[SUBGROUP/SMALL] +max_payload_size = 100 +max_reply_payload_size = 100 +max_smc_payload_size = 100 +block_size = 0 +window_size = 50 +rdmc_send_algorithm = binomial_send + +[SUBGROUP/VCS] +max_payload_size = 1048576 +max_reply_payload_size = 8192 +max_smc_payload_size = 10240 +block_size = 1048576 +window_size = 8 +rdmc_send_algorithm = binomial_send +num_shards = 1 +min_nodes = 1 +max_nodes = 4 + +[SUBGROUP/PCS] +max_payload_size = 8192 +max_reply_payload_size = 8192 +max_smc_payload_size = 10240 +block_size = 1048576 +window_size = 50 +rdmc_send_algorithm = binomial_send +num_shards = 1 +min_nodes = 1 +max_nodes = 4 + +[RDMA] +provider = tcp +domain = lo +tx_depth = 256 +rx_depth = 256 + +[PERS] +file_path = .plog +ramdisk_path = /dev/shm/volatile_t +reset = false +max_log_entry = 1048576 +max_data_size = 549755813888 +server_clock_skew_delta_us = 1000 +temporal_consistency_delta_us = 1000000 + +[LOGGER] +default_log_name = derecho_debug +default_log_level = off + +[LAYOUT] +json_layout_file = 'layout.json' + +[CASCADE] +cpu_cores = 0-3 +num_stateless_workers_for_multicast_ocdp = 2 +num_stateless_workers_for_p2p_ocdp = 2 +num_stateful_workers_for_multicast_ocdp = 2 +num_stateful_workers_for_p2p_ocdp = 2 +worker_cpu_affinity = ' +{ + "multicast_ocdp" : { + "0":"0", + "1":"1" + }, + "p2p_ocdp" : { + "0":"2", + "1":"3" + } +} +' diff --git a/src/applications/tests/temporal_consistency_test/consistency_test_cfg/dfgs.json.tmp b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/dfgs.json.tmp new file mode 100644 index 00000000..9889f609 --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/dfgs.json.tmp @@ -0,0 +1,14 @@ +[ + { + "id": "4a2ec3ae-9cd6-11eb-90e5-0242ac110002", + "desc": "console_printer DFG", + "graph": [ + { + "pathname": "/console_printer", + "shard_dispatcher_list": ["one"], + "user_defined_logic_list": ["48e60f7c-8500-11eb-8755-0242ac110002"], + "destinations": [{}] + } + ] + } +] diff --git a/src/applications/tests/temporal_consistency_test/consistency_test_cfg/layout.json.tmp b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/layout.json.tmp new file mode 100644 index 00000000..7bdb1016 --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/layout.json.tmp @@ -0,0 +1,50 @@ +[ + { + "type_alias": "CascadeMetadataService", + "layout": [ + { + "min_nodes_by_shard": ["1"], + "max_nodes_by_shard": ["1"], + "delivery_modes_by_shard": ["Ordered"], + "reserved_node_ids_by_shard": [["0"]], + "profiles_by_shard": ["DEFAULT"] + } + ] + }, + { + "type_alias": "VolatileCascadeStoreWithStringKey", + "layout": [ + { + "min_nodes_by_shard": ["2"], + "max_nodes_by_shard": ["2"], + "delivery_modes_by_shard": ["Ordered"], + "reserved_node_ids_by_shard": [["1","2"]], + "profiles_by_shard": ["DEFAULT"] + } + ] + }, + { + "type_alias": "PersistentCascadeStoreWithStringKey", + "layout": [ + { + "min_nodes_by_shard": ["2"], + "max_nodes_by_shard": ["2"], + "delivery_modes_by_shard": ["Ordered"], + "reserved_node_ids_by_shard": [["1","2"]], + "profiles_by_shard": ["DEFAULT"] + } + ] + }, + { + "type_alias": "TriggerCascadeNoStoreWithStringKey", + "layout": [ + { + "min_nodes_by_shard": ["2"], + "max_nodes_by_shard": ["2"], + "delivery_modes_by_shard": ["Raw"], + "reserved_node_ids_by_shard": [["1","2"]], + "profiles_by_shard": ["DEFAULT"] + } + ] + } +]' diff --git a/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n0/derecho_node.cfg b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n0/derecho_node.cfg new file mode 100644 index 00000000..237d0991 --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n0/derecho_node.cfg @@ -0,0 +1,7 @@ +[DERECHO] +local_id = 0 +gms_port = 23580 +state_transfer_port = 28366 +sst_port = 37683 +rdmc_port = 31675 +external_port = 32645 diff --git a/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n1/derecho_node.cfg b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n1/derecho_node.cfg new file mode 100644 index 00000000..09fa4c3e --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n1/derecho_node.cfg @@ -0,0 +1,7 @@ +[DERECHO] +local_id = 1 +gms_port = 23581 +state_transfer_port = 28367 +sst_port = 37684 +rdmc_port = 31674 +external_port = 32646 diff --git a/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n2/derecho_node.cfg b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n2/derecho_node.cfg new file mode 100644 index 00000000..9a5192e2 --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n2/derecho_node.cfg @@ -0,0 +1,7 @@ +[DERECHO] +local_id = 2 +gms_port = 23582 +state_transfer_port = 28368 +sst_port = 37685 +rdmc_port = 31677 +external_port = 32647 diff --git a/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n3/derecho_node.cfg b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n3/derecho_node.cfg new file mode 100644 index 00000000..335be927 --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/n3/derecho_node.cfg @@ -0,0 +1,7 @@ +[DERECHO] +local_id = 3 +gms_port = 23583 +state_transfer_port = 28369 +sst_port = 37686 +rdmc_port = 31678 +external_port = 32648 diff --git a/src/applications/tests/temporal_consistency_test/consistency_test_cfg/udl_dlls.cfg.tmp b/src/applications/tests/temporal_consistency_test/consistency_test_cfg/udl_dlls.cfg.tmp new file mode 100644 index 00000000..e69de29b diff --git a/src/applications/tests/temporal_consistency_test/test_get_by_time.cpp b/src/applications/tests/temporal_consistency_test/test_get_by_time.cpp new file mode 100644 index 00000000..2b7918a8 --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/test_get_by_time.cpp @@ -0,0 +1,307 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace derecho::cascade; + +/** + * Test client for get_by_time API with temporal consistency checks + * + * This program tests the get_by_time functionality with the new temporal consistency + * implementation that uses PERS_TEMPORAL_CONSISTENCY_DELTA and PERS_SERVER_CLOCK_SKEW_DELTA_US. + * + * Tests: + * 1. get_by_time with too recent timestamp (should return INVALID_VERSION) + * 2. get_by_time with timestamp in middle range (threshold2 <= ts < threshold1) + * 3. get_by_time with old enough timestamp (ts < threshold2) + * 4. Verify closest version selection + * 5. Test with multiple puts at different times + */ +int main(int argc, char** argv) { + try { + // Initialize the Cascade client (singleton) + auto& capi = ServiceClientAPI::get_service_client(); + + std::cout << "=== Testing get_by_time API with Temporal Consistency ===" << std::endl; + std::cout << std::endl; + + // Get configuration values for reference + uint64_t temporal_consistency_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_TEMPORAL_CONSISTENCY_DELTA_US); + uint64_t clock_skew_delta_us = derecho::getConfUInt64(derecho::Conf::PERS_SERVER_CLOCK_SKEW_DELTA_US); + + std::cout << "Configuration:" << std::endl; + std::cout << " PERS_TEMPORAL_CONSISTENCY_DELTA: " << temporal_consistency_delta_us << " microseconds" << std::endl; + std::cout << " PERS_SERVER_CLOCK_SKEW_DELTA_US: " << clock_skew_delta_us << " microseconds" << std::endl; + std::cout << std::endl; + + // Use subgroup index 0, shard index 0 for testing + uint32_t subgroup_index = 0; + uint32_t shard_index = 0; + + // First, populate the store with some data at known timestamps + std::cout << "=== Setup: Populating store with test data ===" << std::endl; + std::vector> test_data; // key -> timestamp + + // Put several objects with known timestamps + for (int i = 0; i < 5; i++) { + ObjectWithStringKey obj; + obj.key = "test_key_" + std::to_string(i); + obj.blob = Blob(reinterpret_cast(("test_value_" + std::to_string(i)).c_str()), + ("test_value_" + std::to_string(i)).length()); + obj.previous_version = persistent::INVALID_VERSION; + obj.previous_version_by_key = persistent::INVALID_VERSION; + + // Use current time for each put + uint64_t timestamp_us = get_walltime() / 1000ULL; + + auto result = capi.template put_by_time( + obj, timestamp_us, subgroup_index, shard_index, false); + + // Wait for result and store the actual timestamp + for (auto& reply_future : result.get()) { + auto reply = reply_future.second.get(); + uint64_t actual_timestamp = std::get<1>(reply); + test_data.push_back({obj.key, actual_timestamp}); + std::cout << " Put key: " << obj.key << " at timestamp: " << timestamp_us << std::endl; + std::cout << " ✓ Put successful. Version: " << std::get<0>(reply) + << ", Timestamp: " << actual_timestamp << std::endl; + } + + // Small delay between puts to ensure different timestamps + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + std::cout << std::endl; + + // Wait a bit to ensure data is stable + std::cout << "Waiting for data to stabilize..." << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::cout << std::endl; + + // Test 1: get_by_time with too recent timestamp (should return INVALID_VERSION) + std::cout << "=== Test 1: get_by_time with too recent timestamp ===" << std::endl; + { + uint64_t now_us = get_walltime() / 1000ULL; + uint64_t threshold1 = now_us - temporal_consistency_delta_us - 2 * clock_skew_delta_us; + uint64_t recent_time = threshold1 - 1000; // Just before threshold1 + + std::cout << " Current time: " << now_us << " microseconds" << std::endl; + std::cout << " Threshold1 (too recent): " << threshold1 << " microseconds" << std::endl; + std::cout << " Requesting time: " << recent_time << " microseconds" << std::endl; + std::cout << " Expected: INVALID_VERSION (time too recent, not stable)" << std::endl; + + auto result = capi.template get_by_time( + test_data[0].first, recent_time, true, subgroup_index, shard_index); + + bool found_valid = false; + for (auto& reply_future : result.get()) { + auto obj = reply_future.second.get(); + if (obj.key == test_data[0].first) { + found_valid = true; + std::cout << " ✗ Unexpected: Found object with key: " << obj.key << std::endl; + } + } + + if (!found_valid) { + std::cout << " ✓ Correctly returned INVALID_VERSION (empty result)" << std::endl; + } + } + std::cout << std::endl; + + // Test 2: get_by_time with timestamp in middle range (threshold2 <= ts < threshold1) + std::cout << "=== Test 2: get_by_time with timestamp in middle range ===" << std::endl; + { + uint64_t now_us = get_walltime() / 1000ULL; + uint64_t threshold1 = now_us - temporal_consistency_delta_us - 2 * clock_skew_delta_us; + uint64_t threshold2 = now_us - temporal_consistency_delta_us - 3 * clock_skew_delta_us; + + // Use a timestamp in the middle range + uint64_t middle_time = (threshold1 + threshold2) / 2; + + std::cout << " Current time: " << now_us << " microseconds" << std::endl; + std::cout << " Threshold1: " << threshold1 << " microseconds" << std::endl; + std::cout << " Threshold2: " << threshold2 << " microseconds" << std::endl; + std::cout << " Requesting time: " << middle_time << " microseconds" << std::endl; + std::cout << " Expected: Closest version <= threshold1" << std::endl; + + // Use the oldest test data key + if (!test_data.empty()) { + auto result = capi.template get_by_time( + test_data[0].first, middle_time, true, subgroup_index, shard_index); + + bool found = false; + for (auto& reply_future : result.get()) { + auto obj = reply_future.second.get(); + std::cout << " ✓ Found object with key: " << obj.key + << ", timestamp: " << obj.timestamp_us << std::endl; + if (obj.timestamp_us <= threshold1) { + std::cout << " ✓ Timestamp is within threshold1" << std::endl; + } else { + std::cout << " ✗ Warning: Timestamp exceeds threshold1" << std::endl; + } + found = true; + } + + if (!found) { + std::cout << " ✗ No object found" << std::endl; + } + } + } + std::cout << std::endl; + + // Test 3: get_by_time with old enough timestamp (ts < threshold2) + std::cout << "=== Test 3: get_by_time with old enough timestamp ===" << std::endl; + { + uint64_t now_us = get_walltime() / 1000ULL; + uint64_t threshold2 = now_us - temporal_consistency_delta_us - 3 * clock_skew_delta_us; + + // Use a timestamp well before threshold2 + uint64_t old_time = threshold2 - clock_skew_delta_us; + + std::cout << " Current time: " << now_us << " microseconds" << std::endl; + std::cout << " Threshold2: " << threshold2 << " microseconds" << std::endl; + std::cout << " Requesting time: " << old_time << " microseconds" << std::endl; + std::cout << " Expected: Closest version <= (requested_time + clock_skew_delta)" << std::endl; + + if (!test_data.empty()) { + auto result = capi.template get_by_time( + test_data[0].first, old_time, true, subgroup_index, shard_index); + + bool found = false; + for (auto& reply_future : result.get()) { + auto obj = reply_future.second.get(); + uint64_t max_allowed = old_time + clock_skew_delta_us; + std::cout << " ✓ Found object with key: " << obj.key + << ", timestamp: " << obj.timestamp_us << std::endl; + std::cout << " Max allowed timestamp: " << max_allowed << std::endl; + if (obj.timestamp_us <= max_allowed) { + std::cout << " ✓ Timestamp is within allowed range" << std::endl; + } else { + std::cout << " ✗ Warning: Timestamp exceeds allowed range" << std::endl; + } + found = true; + } + + if (!found) { + std::cout << " ✗ No object found" << std::endl; + } + } + } + std::cout << std::endl; + + // Test 4: Verify closest version selection with multiple versions + std::cout << "=== Test 4: Verify closest version selection ===" << std::endl; + { + if (test_data.size() >= 3) { + // Get the timestamps of the first and third objects + uint64_t first_timestamp = test_data[0].second; + uint64_t third_timestamp = test_data[2].second; + + // Request a time between first and third + uint64_t target_time = (first_timestamp + third_timestamp) / 2; + + std::cout << " First object timestamp: " << first_timestamp << std::endl; + std::cout << " Third object timestamp: " << third_timestamp << std::endl; + std::cout << " Target time: " << target_time << std::endl; + std::cout << " Expected: Closest version to target_time" << std::endl; + + // Test with the key that has multiple versions + auto result = capi.template get_by_time( + test_data[0].first, target_time, true, subgroup_index, shard_index); + + bool found = false; + for (auto& reply_future : result.get()) { + auto obj = reply_future.second.get(); + uint64_t distance = (obj.timestamp_us > target_time) ? + (obj.timestamp_us - target_time) : + (target_time - obj.timestamp_us); + std::cout << " ✓ Found object with timestamp: " << obj.timestamp_us << std::endl; + std::cout << " Distance from target: " << distance << " microseconds" << std::endl; + found = true; + } + + if (!found) { + std::cout << " ✗ No object found" << std::endl; + } + } else { + std::cout << " Skipped: Need at least 3 test objects" << std::endl; + } + } + std::cout << std::endl; + + // Test 5: get_by_time with exact timestamp match + std::cout << "=== Test 5: get_by_time with exact timestamp match ===" << std::endl; + { + if (!test_data.empty()) { + uint64_t exact_timestamp = test_data[0].second; + + std::cout << " Requesting exact timestamp: " << exact_timestamp << std::endl; + std::cout << " Expected: Object with matching timestamp" << std::endl; + + auto result = capi.template get_by_time( + test_data[0].first, exact_timestamp, true, subgroup_index, shard_index); + + bool found = false; + for (auto& reply_future : result.get()) { + auto obj = reply_future.second.get(); + std::cout << " ✓ Found object with key: " << obj.key + << ", timestamp: " << obj.timestamp_us << std::endl; + if (obj.timestamp_us == exact_timestamp) { + std::cout << " ✓ Exact timestamp match!" << std::endl; + } else { + std::cout << " Note: Returned closest timestamp: " << obj.timestamp_us << std::endl; + } + found = true; + } + + if (!found) { + std::cout << " ✗ No object found" << std::endl; + } + } + } + std::cout << std::endl; + + // Test 6: get_by_time with very old timestamp (should still work if data exists) + std::cout << "=== Test 6: get_by_time with very old timestamp ===" << std::endl; + { + if (!test_data.empty()) { + // Use a timestamp from 10 seconds ago + uint64_t now_us = get_walltime() / 1000ULL; + uint64_t very_old_time = now_us - 10000000ULL; // 10 seconds ago + + std::cout << " Current time: " << now_us << " microseconds" << std::endl; + std::cout << " Requesting very old time: " << very_old_time << " microseconds" << std::endl; + std::cout << " Expected: Closest version if data exists" << std::endl; + + auto result = capi.template get_by_time( + test_data[0].first, very_old_time, true, subgroup_index, shard_index); + + bool found = false; + for (auto& reply_future : result.get()) { + auto obj = reply_future.second.get(); + std::cout << " ✓ Found object with timestamp: " << obj.timestamp_us << std::endl; + found = true; + } + + if (!found) { + std::cout << " Note: No object found (may be expected if timestamp is too old)" << std::endl; + } + } + } + std::cout << std::endl; + + std::cout << "=== All tests completed ===" << std::endl; + return 0; + + } catch (const std::exception& e) { + std::cerr << "Exception: " << e.what() << std::endl; + return -1; + } +} \ No newline at end of file diff --git a/src/applications/tests/temporal_consistency_test/test_put_by_time.cpp b/src/applications/tests/temporal_consistency_test/test_put_by_time.cpp new file mode 100644 index 00000000..e4535fac --- /dev/null +++ b/src/applications/tests/temporal_consistency_test/test_put_by_time.cpp @@ -0,0 +1,210 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace derecho::cascade; + +/** + * Test client for put_by_time API + * + * This program tests the new put_by_time functionality using explicit subgroup/shard indices. + * Tests: + * 1. Valid timestamp test - should succeed + * 2. Invalid (too old) timestamp test - should be rejected + * 3. Comparison with regular put - verify timestamps are different + */ +int main(int argc, char** argv) { + try { + // Initialize the Cascade client (singleton) + auto& capi = ServiceClientAPI::get_service_client(); + + std::cout << "=== Testing put_by_time API ===" << std::endl; + std::cout << std::endl; + + // Use subgroup index 0, shard index 0 for testing + uint32_t subgroup_index = 0; + uint32_t shard_index = 0; + + // Test 1: put_by_time with valid timestamp (current time) + std::cout << "Test 1: put_by_time with current timestamp" << std::endl; + { + ObjectWithStringKey obj; + obj.key = "test_key_1"; + obj.blob = Blob(reinterpret_cast("test_value_1"), 12); + obj.previous_version = persistent::INVALID_VERSION; + obj.previous_version_by_key = persistent::INVALID_VERSION; + + // Get current time in microseconds + uint64_t current_time_us = get_walltime() / 1000ULL; + + auto result = capi.template put_by_time(obj, current_time_us, subgroup_index, shard_index, false); + + // Wait for result + for (auto& reply_future : result.get()) { + auto reply = reply_future.second.get(); + std::cout << " ✓ Success! Version: " << std::get<0>(reply) + << ", Timestamp: " << std::get<1>(reply) << " microseconds" << std::endl; + } + std::cout << " Testing Current time: " << current_time_us << " microseconds" << std::endl; + } + std::cout << std::endl; + + // Test 2: put_by_time with future timestamp (should work) + std::cout << "Test 2: put_by_time with future timestamp (within delta)" << std::endl; + { + ObjectWithStringKey obj; + obj.key = "test_key_2"; + obj.blob = Blob(reinterpret_cast("test_value_2"), 12); + obj.previous_version = persistent::INVALID_VERSION; + obj.previous_version_by_key = persistent::INVALID_VERSION; + + // Use a timestamp 500ms in the future (within 1 second delta) + // Calculate and call immediately to avoid terminal output delays consuming the margin + uint64_t current_time_us = get_walltime() / 1000ULL; + uint64_t future_time_us = current_time_us + 500000ULL; // 500ms = 500000 microseconds + auto result = capi.template put_by_time(obj, future_time_us, subgroup_index, shard_index, false); + + // Wait for result + for (auto& reply_future : result.get()) { + auto reply = reply_future.second.get(); + std::cout << " ✓ Success! Version: " << std::get<0>(reply) + << ", Timestamp: " << std::get<1>(reply) << " microseconds" << std::endl; + } + + // Print diagnostic info after the call + std::cout << " Testing Current time (at call): " << current_time_us << " microseconds" << std::endl; + std::cout << " Testing Future time: " << future_time_us << " microseconds" << std::endl; + } + std::cout << std::endl; + + // Test 3: put_by_time with too old timestamp (should be rejected) + std::cout << "Test 3: put_by_time with too old timestamp (should be rejected)" << std::endl; + { + ObjectWithStringKey obj; + obj.key = "test_key_3"; + obj.blob = Blob(reinterpret_cast("test_value_3"), 12); + obj.previous_version = persistent::INVALID_VERSION; + obj.previous_version_by_key = persistent::INVALID_VERSION; + + // Use a timestamp 2 seconds in the past (older than 1 second delta) + uint64_t current_time_us = get_walltime() / 1000ULL; + uint64_t old_time_us = current_time_us - 2000000ULL; // 2 seconds = 2000000 microseconds + try { + auto result = capi.template put_by_time(obj, old_time_us, subgroup_index, shard_index, false); + // If we get here, the call didn't throw - check if result is empty + bool has_results = false; + for (auto& reply_future : result.get()) { + has_results = true; + auto reply = reply_future.second.get(); + std::cout << " ✗ Unexpected success! Version: " << std::get<0>(reply) + << ", Timestamp: " << std::get<1>(reply) << " microseconds" << std::endl; + } + if (!has_results) { + std::cout << " ✓ Correctly rejected (empty result)" << std::endl; + } + } catch (const derecho::derecho_exception& e) { + std::cout << " ✓ Correctly rejected with exception: " << e.what() << std::endl; + } + std::cout << " Testing Current time: " << current_time_us << " microseconds" << std::endl; + std::cout << " Old time: " << old_time_us << " microseconds" << std::endl; + } + std::cout << std::endl; + + // Test 4: Compare put_by_time vs regular put + std::cout << "Test 4: Compare put_by_time vs regular put" << std::endl; + { + // First, do a regular put + ObjectWithStringKey obj1; + obj1.key = "test_key_4a"; + obj1.blob = Blob(reinterpret_cast("test_value_4a"), 13); + obj1.previous_version = persistent::INVALID_VERSION; + obj1.previous_version_by_key = persistent::INVALID_VERSION; + + std::cout << " Regular put..." << std::endl; + auto result1 = capi.template put(obj1, subgroup_index, shard_index, false); + uint64_t regular_put_timestamp = 0; + for (auto& reply_future : result1.get()) { + auto reply = reply_future.second.get(); + regular_put_timestamp = std::get<1>(reply); + std::cout << " Timestamp: " << regular_put_timestamp << " microseconds" << std::endl; + } + + // Small delay to ensure different timestamps + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + // Now do put_by_time with a specific timestamp + ObjectWithStringKey obj2; + obj2.key = "test_key_4b"; + obj2.blob = Blob(reinterpret_cast("test_value_4b"), 13); + obj2.previous_version = persistent::INVALID_VERSION; + obj2.previous_version_by_key = persistent::INVALID_VERSION; + + // Use a timestamp that's guaranteed to be higher than the current HLC + // (regular_put_timestamp + 100ms to be safe) + uint64_t custom_timestamp_us = regular_put_timestamp + 100000ULL; // 100ms ahead of regular put + auto result2 = capi.template put_by_time(obj2, custom_timestamp_us, subgroup_index, shard_index, false); + uint64_t custom_put_timestamp = 0; + for (auto& reply_future : result2.get()) { + auto reply = reply_future.second.get(); + custom_put_timestamp = std::get<1>(reply); + std::cout << " Timestamp: " << custom_put_timestamp << " microseconds" << std::endl; + } + + // Verify the custom timestamp was used + if (custom_put_timestamp == custom_timestamp_us) { + std::cout << " ✓ Custom timestamp was correctly used!" << std::endl; + } else { + std::cout << " ✗ Error: Custom timestamp mismatch. Expected: " << custom_timestamp_us + << ", Got: " << custom_put_timestamp << std::endl; + } + std::cout << " Testing put_by_time with custom timestamp: " << custom_timestamp_us << " microseconds" << std::endl; + std::cout << " (100ms ahead of regular put to ensure HLC advances)" << std::endl; + } + std::cout << std::endl; + + // Test 5: put_by_time with specific timestamp value + std::cout << "Test 5: put_by_time with specific timestamp value" << std::endl; + { + ObjectWithStringKey obj; + obj.key = "test_key_5"; + obj.blob = Blob(reinterpret_cast("test_value_5"), 12); + obj.previous_version = persistent::INVALID_VERSION; + obj.previous_version_by_key = persistent::INVALID_VERSION; + + // Use a specific timestamp (e.g., 1000000000 microseconds = 1000 seconds) + uint64_t specific_timestamp_us = 1000000000ULL; + try { + auto result = capi.template put_by_time(obj, specific_timestamp_us, subgroup_index, shard_index, false); + bool has_results = false; + for (auto& reply_future : result.get()) { + has_results = true; + auto reply = reply_future.second.get(); + std::cout << " Result - Version: " << std::get<0>(reply) + << ", Timestamp: " << std::get<1>(reply) << " microseconds" << std::endl; + } + if (!has_results) { + std::cout << " Timestamp was rejected (empty result)" << std::endl; + } + } catch (const derecho::derecho_exception& e) { + std::cout << " Timestamp was rejected with exception: " << e.what() << std::endl; + } + std::cout << " Using specific timestamp: " << specific_timestamp_us << " microseconds" << std::endl; + std::cout << " Note: This will be rejected if it's too old" << std::endl; + } + std::cout << std::endl; + + std::cout << "=== All tests completed ===" << std::endl; + return 0; + + } catch (const std::exception& e) { + std::cerr << "Exception: " << e.what() << std::endl; + return -1; + } +} +