Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 16 additions & 28 deletions source/common/config/datasource.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ absl::optional<std::string> getPath(const envoy::config::core::v3::DataSource& s

template <class DataType>
using DataTransform = std::function<absl::StatusOr<std::shared_ptr<DataType>>(absl::string_view)>;
using DataUpdateCb = std::function<void()>;

struct ProviderOptions {
// Use an empty string if no DataSource case is specified.
Expand All @@ -94,11 +93,9 @@ template <class DataType> class DynamicData {
ThreadLocal::SlotAllocator& tls, Api::Api& api,
DataTransform<DataType> data_transform_cb, const ProviderOptions& options,
std::shared_ptr<DataType> initial_data, uint64_t initial_hash,
absl::AnyInvocable<void()> cleanup, absl::Status& creation_status,
absl::optional<std::function<void()>> data_update_cb = absl::nullopt)
absl::AnyInvocable<void()> cleanup, absl::Status& creation_status)
: dispatcher_(main_dispatcher), api_(api), options_(options), filename_(source.filename()),
data_transform_cb_(data_transform_cb), data_update_cb_(data_update_cb), hash_(initial_hash),
cleanup_(std::move(cleanup)) {
data_transform_cb_(data_transform_cb), hash_(initial_hash), cleanup_(std::move(cleanup)) {
slot_ =
ThreadLocal::TypedSlot<typename DynamicData<DataType>::ThreadLocalData>::makeUnique(tls);
slot_->set([initial_data = std::move(initial_data)](Event::Dispatcher&) {
Expand Down Expand Up @@ -163,14 +160,11 @@ template <class DataType> class DynamicData {
hash_ = new_hash;
}

slot_->runOnAllThreads([new_data = std::move(transformed_new_data_or_error.value()),
this](OptRef<typename DynamicData<DataType>::ThreadLocalData> obj) {
slot_->runOnAllThreads([new_data = std::move(transformed_new_data_or_error.value())](
OptRef<typename DynamicData<DataType>::ThreadLocalData> obj) {
if (obj.has_value()) {
obj->data_ = new_data;
}
if (data_update_cb_.has_value()) {
(*data_update_cb_)();
}
});
return absl::OkStatus();
}
Expand All @@ -180,7 +174,6 @@ template <class DataType> class DynamicData {
const ProviderOptions options_;
const std::string filename_;
DataTransform<DataType> data_transform_cb_;
absl::optional<DataUpdateCb> data_update_cb_;
uint64_t hash_;
absl::AnyInvocable<void()> cleanup_;
ThreadLocal::TypedSlotPtr<ThreadLocalData> slot_;
Expand Down Expand Up @@ -211,7 +204,6 @@ template <class DataType> class DataSourceProvider {
* @param data_transform_cb transforms content of the DataSource (type std::string)
* to the desired `DataType` type.
* @param max_size max size limit of file to read, default 0 means no limit.
* @param data_update_cb optional callback that can be invoked upon data update in the DataSource.
* @return absl::StatusOr<DataSourceProvider> with DataSource contents. or an error
* status if any error occurs.
* NOTE: If file watch is enabled and the new file content does not meet the
Expand All @@ -220,17 +212,15 @@ template <class DataType> class DataSourceProvider {
static absl::StatusOr<DataSourceProviderPtr<DataType>>
create(const ProtoDataSource& source, Event::Dispatcher& main_dispatcher,
ThreadLocal::SlotAllocator& tls, Api::Api& api, bool allow_empty,
DataTransform<DataType> data_transform_cb, uint64_t max_size,
absl::optional<DataUpdateCb> data_update_cb = absl::nullopt) {
DataTransform<DataType> data_transform_cb, uint64_t max_size) {
return create(source, main_dispatcher, tls, api, data_transform_cb,
{.allow_empty = allow_empty, .max_size = max_size}, {}, data_update_cb);
{.allow_empty = allow_empty, .max_size = max_size}, {});
}

static absl::StatusOr<DataSourceProviderPtr<DataType>>
create(const ProtoDataSource& source, Event::Dispatcher& main_dispatcher,
ThreadLocal::SlotAllocator& tls, Api::Api& api, DataTransform<DataType> data_transform_cb,
const ProviderOptions& options, absl::AnyInvocable<void()> cleanup = {},
absl::optional<DataUpdateCb> data_update_cb = absl::nullopt) {
const ProviderOptions& options, absl::AnyInvocable<void()> cleanup = {}) {
uint64_t max_size = options.max_size;
auto initial_data_or_error = read(source, options.allow_empty, api, max_size);
RETURN_IF_NOT_OK_REF(initial_data_or_error.status());
Expand All @@ -253,11 +243,11 @@ template <class DataType> class DataSourceProvider {

absl::Status creation_status = absl::OkStatus();
const uint64_t hash = options.hash_content ? HashUtil::xxHash64(*initial_data_or_error) : 0;
auto ret = std::unique_ptr<DataSourceProvider>(
new DataSourceProvider<DataType>(std::make_unique<DynamicData<DataType>>(
source, main_dispatcher, tls, api, data_transform_cb, options,
std::move(transformed_data_or_error).value(), hash, std::move(cleanup), creation_status,
data_update_cb)));
auto ret = std::unique_ptr<DataSourceProvider>(new DataSourceProvider<DataType>(
std::make_unique<DynamicData<DataType>>(source, main_dispatcher, tls, api,
data_transform_cb, options,
std::move(transformed_data_or_error).value(), hash,
std::move(cleanup), creation_status)));
RETURN_IF_NOT_OK(creation_status);
return std::move(ret);
}
Expand Down Expand Up @@ -301,16 +291,15 @@ class ProviderSingleton : public Singleton::Instance,
public:
ProviderSingleton(Event::Dispatcher& main_dispatcher, ThreadLocal::SlotAllocator& tls,
Api::Api& api, DataTransform<DataType> data_transform_cb,
const ProviderOptions& options,
absl::optional<DataUpdateCb> data_update_cb = absl::nullopt)
const ProviderOptions& options)
: dispatcher_(main_dispatcher), tls_(tls), api_(api), data_transform_cb_(data_transform_cb),
data_update_cb_(data_update_cb), options_(options) {}
options_(options) {}

absl::StatusOr<DataSourceProviderSharedPtr<DataType>> getOrCreate(const ProtoDataSource& source) {
ASSERT_IS_MAIN_OR_TEST_THREAD();
if (!usesFileWatching(source, options_)) {
return DataSourceProvider<DataType>::create(
source, dispatcher_, tls_, api_, data_transform_cb_, options_, {}, data_update_cb_);
return DataSourceProvider<DataType>::create(source, dispatcher_, tls_, api_,
data_transform_cb_, options_, {});
}
const size_t config_hash = MessageUtil::hash(source);
auto it = dynamic_providers_.find(config_hash);
Expand Down Expand Up @@ -347,7 +336,6 @@ class ProviderSingleton : public Singleton::Instance,
ThreadLocal::SlotAllocator& tls_;
Api::Api& api_;
DataTransform<DataType> data_transform_cb_;
absl::optional<DataUpdateCb> data_update_cb_;
const ProviderOptions options_;
absl::flat_hash_map<size_t, std::weak_ptr<DataSourceProvider<DataType>>> dynamic_providers_;
};
Expand Down
Loading
Loading