Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions include/cascade/cascade_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ class CriticalDataPathObserver : public derecho::DeserializationContext {
* @param[in] value The value of the K/V pair
* @param[in] cascade_ctxt The cascade context to be used later
* @param[in] is_trigger True for critical data path of `p2p_send`; otherwise, the critical data path of `ordered_send`.
*
* @return void
*/
virtual void operator()(const uint32_t subgroup_idx,
const uint32_t shard_idx,
Expand Down Expand Up @@ -111,8 +109,6 @@ class ICascadeStore {
*
* @param[in] value The K/V pair value
* @param[in] as_trigger The object will NOT be used to update the K/V state.
*
* @return void
*/
virtual void put_and_forget(const VT& value, bool as_trigger) const = 0;

Expand Down Expand Up @@ -149,7 +145,7 @@ class ICascadeStore {
* If `stable == false`, we only return the data reflecting the latest locally delivered atomic
* broadcast. Otherwise, stable data will be returned, meaning that the persisted states returned
* is safe: they will survive after whole system recovery.
* @param[in] exact
* @param[in] exact
* The exact match flag: this function try to return the value of that key at the 'ver'. If such a
* value does not exists and exact is true, it will throw an exception. If such a value does not
* exists and exact is false, it will return the latest state of the value for 'key' before 'ver'.
Expand Down Expand Up @@ -272,7 +268,7 @@ class ICascadeStore {
* @param[in] key The key
* @param[in] ver Version, if `ver == CURRENT_VERSION`, get the latest value.
* @param[in] stable
* @param[in] exact
* @param[in] exact
* The exact match flag: this function try to return the value of that key at the 'ver'. If such a
* value does not exists and exact is true, it will throw an exception. If such a value does not
* exists and exact is false, it will return the latest state of the value for 'key' before 'ver'.
Expand Down Expand Up @@ -427,7 +423,7 @@ VT create_null_object_cb(const KT& key = *IK);
* We use both the concepts of null and valid object in Cascade. A null object precisely means 'no data'; while a
* valid object literarily means an object is 'valid'. Technically, a null object has a valid key while invalid
* object does not.
*
*
* @tparam KT The key type.
* @tparam VT The value type.
*/
Expand Down Expand Up @@ -489,7 +485,7 @@ class IKeepVersion {

/**
* @brief The version getter
*
*
* Get the version
*
* @return The K/V object's version.
Expand Down
2 changes: 0 additions & 2 deletions include/cascade/detail/prefix_registry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ class PrefixRegistry {
*
* @param path - the full path.
* @param collector - the lambda function to collect values for all prefixes of a string.
*
* @return
*/
void collect_values_for_prefixes(const std::string& path,
const std::function<void(const std::string& prefix,const std::shared_ptr<T>& value)>& collector) const;
Expand Down
2 changes: 1 addition & 1 deletion include/cascade/detail/service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2440,7 +2440,7 @@ void ExecutionEngine<CascadeTypes...>::workhorse(uint32_t worker_id, struct acti
dbg_default_trace("Cascade context workhorse[{}] started", worker_id);
while(is_running) {
// waiting for an action
Action action = std::move(aq.action_buffer_dequeue(is_running));
Action action = aq.action_buffer_dequeue(is_running);
// if action_buffer_dequeue return with is_running == false, value_ptr is invalid(nullptr).
action.fire(this,worker_id);

Expand Down
280 changes: 162 additions & 118 deletions include/cascade/service.hpp

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion include/cascade/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ class OpenLoopLatencyCollectorClient {
* @param id The event id for corresponding event.
* @param use_local_ts Using loca timestamp.
*
* @return N/A
*/
virtual void ack(uint32_t type, uint32_t id, bool use_local_ts = false) = 0;

Expand Down
5 changes: 4 additions & 1 deletion scripts/prerequisites/install-ann.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/bin/bash
INSTALL_PREFIX=${HOME}/opt-dev
INSTALL_PREFIX="/usr/local"
if [[ $# -gt 0 ]]; then
INSTALL_PREFIX=$1
fi

wget https://www.cs.umd.edu/~mount/ANN/Files/1.1.2/ann_1.1.2.tar.gz
tar -xf ann_1.1.2.tar.gz
Expand Down
16 changes: 16 additions & 0 deletions scripts/prerequisites/install-boolinq.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env bash
set -eu
export TMPDIR=/var/tmp
WORKPATH=`mktemp -d`
INSTALL_PREFIX="/usr/local"
if [[ $# -gt 0 ]]; then
INSTALL_PREFIX=$1
fi

echo "Using INSTALL_PREFIX=${INSTALL_PREFIX}"

cd ${WORKPATH}
git clone https://github.com/k06a/boolinq.git
cd boolinq
cp -r include/ ${INSTALL_PREFIX}
rm -rf ${WORKPATH}
5 changes: 4 additions & 1 deletion scripts/prerequisites/install-cppflow.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/bin/bash
export INSTALL_PREFIX=$HOME/opt-dev/
INSTALL_PREFIX="/usr/local"
if [[ $# -gt 0 ]]; then
INSTALL_PREFIX=$1
fi
git clone https://github.com/serizba/cppflow.git
cd cppflow
git checkout 9ea9519c66d9b1893e2d298db8aa1ee866f903a2
Expand Down
7 changes: 5 additions & 2 deletions scripts/prerequisites/install-libtorch.sh
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#!/bin/bash

if [ $# != 1 ]; then
if [ $# -lt 1 ]; then
echo "USAGE: $0 <cpu|gpu>"
exit 0
fi

INSTALL_PREFIX=${HOME}/opt-dev
INSTALL_PREFIX="/usr/local"
if [[ $# -gt 1 ]]; then
INSTALL_PREFIX=$2
fi
INSTALL_TYPE=$1
ZIP_FILE=libtorch-cxx11.zip
if [ $INSTALL_TYPE == 'cpu' ]; then
Expand Down
5 changes: 4 additions & 1 deletion scripts/prerequisites/install-opencv.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/bin/bash
export INSTALL_PREFIX=$HOME/opt-dev/
INSTALL_PREFIX="/usr/local"
if [[ $# -gt 0 ]]; then
INSTALL_PREFIX=$1
fi
# install python opencv
sudo apt-get -y install python3-opencv
# Download and unpack sources
Expand Down
5 changes: 4 additions & 1 deletion scripts/prerequisites/install-tensorflow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ if [ $# -lt 2 ]; then
fi

# pip3 install tensorflow==${VERSION}
INSTALL_PREFIX=$HOME/opt-dev/
INSTALL_PREFIX="/usr/local"
if [[ $# -gt 2 ]]; then
INSTALL_PREFIX=$3
fi
VERSION=$1
ENGINE=$2
FILENAME=libtensorflow-${ENGINE}-linux-x86_64-${VERSION}.tar.gz
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ static std::vector<std::string> tokenize(std::string& line) {
}

static void client_help() {
static const char* HELP_STR =
static const char* HELP_STR =
"(v/p/t)put <object_id> <contents>\n"
" - Put an object\n"
"(v/p)get <object_id> [-t timestamp_in_us | -v version_number]\n"
Expand Down Expand Up @@ -67,7 +67,7 @@ static void client_put(derecho::ExternalGroupClient<VCS,PCS,TCS>& group,
}

uint64_t key = std::stoll(tokens[1]);

//TODO: the previous_version should be used to enforce version check. INVALID_VERSION disables the feature.
ObjectWithUInt64Key o(key,Blob(reinterpret_cast<const uint8_t*>(tokens[2].c_str()),tokens[2].size()));

Expand Down Expand Up @@ -95,7 +95,7 @@ static void client_trigger_put(derecho::ExternalGroupClient<VCS,PCS,TCS>& group,
}

uint64_t key = std::stoll(tokens[1]);

ObjectWithUInt64Key o(key,Blob(reinterpret_cast<const uint8_t*>(tokens[2].c_str()),tokens[2].size()));

ExternalClientCaller<TCS,std::remove_reference<decltype(group)>::type>& vcs_ec = group.get_subgroup_caller<TCS>();
Expand Down Expand Up @@ -203,7 +203,7 @@ static void client_remove(derecho::ExternalGroupClient<VCS,PCS,TCS>& group,
}

uint64_t key = std::stoll(tokens[1]);

if (is_persistent) {
ExternalClientCaller<PCS,std::remove_reference<decltype(group)>::type>& pcs_ec = group.get_subgroup_caller<PCS>();
auto result = pcs_ec.p2p_send<RPC_NAME(remove)>(member,key);
Expand Down Expand Up @@ -300,6 +300,7 @@ class PerfCDPO : public CriticalDataPathObserver<CascadeType> {
// @overload
void operator () (const uint32_t sgidx,
const uint32_t shidx,
const derecho::node_id_t sender_id,
const typename CascadeType::KeyType& key,
const typename CascadeType::ObjectType& value,
ICascadeContext* cascade_context,
Expand Down
8 changes: 5 additions & 3 deletions src/applications/tests/cascade_as_subgroup_classes/perf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,11 @@ class PerfCDPO: public CriticalDataPathObserver<CascadeType> {
// @override
virtual void operator () (const uint32_t sgidx,
const uint32_t shidx,
const derecho::node_id_t sender_id,
const typename CascadeType::KeyType& key,
const typename CascadeType::ObjectType& value,
void* cascade_context){
ICascadeContext* cascade_context,
bool is_trigger = false) {
dbg_default_info("Watcher is called with\n\tsubgroup idx = {},\n\tshard idx = {},\n\tkey = {},\n\tvalue = [hidden].", sgidx, shidx, key);
}
};
Expand Down Expand Up @@ -347,7 +349,7 @@ int do_client(int argc,char** args) {

for(uint64_t i = 0; i < num_messages; i++) {
ObjectWithUInt64Key o(randomize_key(i)%max_distinct_objects,Blob(bbuf, msg_size));
cs.do_send(i,[&o,&pcs_ec,&server_id](){return std::move(pcs_ec.p2p_send<RPC_NAME(put)>(server_id,o,false));});
cs.do_send(i,[&o,&pcs_ec,&server_id](){return pcs_ec.p2p_send<RPC_NAME(put)>(server_id,o,false);});
}
free(bbuf);

Expand All @@ -367,7 +369,7 @@ int do_client(int argc,char** args) {

for(uint64_t i = 0; i < num_messages; i++) {
ObjectWithUInt64Key o(randomize_key(i)%max_distinct_objects,Blob(bbuf, msg_size));
cs.do_send(i,[&o,&vcs_ec,&server_id](){return std::move(vcs_ec.p2p_send<RPC_NAME(put)>(server_id,o,false));});
cs.do_send(i,[&o,&vcs_ec,&server_id](){return vcs_ec.p2p_send<RPC_NAME(put)>(server_id,o,false);});
}
free(bbuf);

Expand Down
4 changes: 2 additions & 2 deletions src/service/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,10 @@ void collective_trigger_put(ServiceClientAPI& capi, const std::string& key, cons
template <typename SubgroupType>
void remove(ServiceClientAPI& capi, const std::string& key, uint32_t subgroup_index, uint32_t shard_index) {
if constexpr (std::is_same<typename SubgroupType::KeyType,uint64_t>::value) {
derecho::rpc::QueryResults<derecho::cascade::version_tuple> result = std::move(capi.template remove<SubgroupType>(static_cast<uint64_t>(std::stol(key,nullptr,0)), subgroup_index, shard_index));
derecho::rpc::QueryResults<derecho::cascade::version_tuple> result = capi.template remove<SubgroupType>(static_cast<uint64_t>(std::stol(key,nullptr,0)), subgroup_index, shard_index);
check_put_and_remove_result(result);
} else if constexpr (std::is_same<typename SubgroupType::KeyType,std::string>::value) {
derecho::rpc::QueryResults<derecho::cascade::version_tuple> result = std::move(capi.template remove<SubgroupType>(key, subgroup_index, shard_index));
derecho::rpc::QueryResults<derecho::cascade::version_tuple> result = capi.template remove<SubgroupType>(key, subgroup_index, shard_index);
check_put_and_remove_result(result);
} else {
print_red(std::string("Unhandled KeyType:") + typeid(typename SubgroupType::KeyType).name());
Expand Down
Loading