Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ new_features:
change: |
Added ``envoy_dynamic_module_callback_is_validation_mode`` ABI callback that allows dynamic
modules to check if the server is running in config validation mode.
- area: dynamic_modules
change: |
Added ``envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes`` and
``envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed`` ABI callbacks so
that a dynamic-module cluster's load balancer can read filter state set by an upstream HTTP
filter (or any other producer) when picking a host. The Rust SDK exposes these as
``ClusterLbContext::get_filter_state_bytes`` and ``ClusterLbContext::get_filter_state_typed``.
- area: access_log
change: |
Supported the singleton stats scope in the :ref:`stats access logger <envoy_v3_api_msg_extensions.access_loggers.stats.v3.Config>`.
Expand Down
1 change: 1 addition & 0 deletions source/extensions/clusters/dynamic_modules/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ envoy_cc_library(
"//source/common/http:message_lib",
"//source/common/network:address_lib",
"//source/common/network:utility_lib",
"//source/common/router:string_accessor_lib",
"//source/common/stats:utility_lib",
"//source/common/upstream:cluster_factory_lib",
"//source/common/upstream:upstream_includes",
Expand Down
65 changes: 65 additions & 0 deletions source/extensions/clusters/dynamic_modules/abi_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "source/common/common/assert.h"
#include "source/common/common/thread.h"
#include "source/common/http/message_impl.h"
#include "source/common/router/string_accessor_impl.h"
#include "source/extensions/clusters/dynamic_modules/cluster.h"
#include "source/extensions/dynamic_modules/abi/abi.h"

Expand Down Expand Up @@ -831,6 +832,70 @@ bool envoy_dynamic_module_callback_cluster_lb_context_get_downstream_connection_
return true;
}

bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes(
envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr,
envoy_dynamic_module_type_module_buffer key, envoy_dynamic_module_type_envoy_buffer* result) {
if (context_envoy_ptr == nullptr || result == nullptr) {
return false;
}
auto* stream_info = getContext(context_envoy_ptr)->requestStreamInfo();
if (!stream_info) {
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug,
"stream info is not available");
return false;
}
absl::string_view key_view(key.ptr, key.length);
auto filter_state =
stream_info->filterState()->getDataReadOnly<Envoy::Router::StringAccessor>(key_view);
if (!filter_state) {
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug,
"key '{}' not found in filter state", key_view);
return false;
}
absl::string_view str = filter_state->asString();
*result = {const_cast<char*>(str.data()), str.size()};
return true;
}

bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed(
envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr,
envoy_dynamic_module_type_module_buffer key, envoy_dynamic_module_type_envoy_buffer* result) {
if (context_envoy_ptr == nullptr || result == nullptr) {
return false;
}
auto* stream_info = getContext(context_envoy_ptr)->requestStreamInfo();
if (!stream_info) {
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug,
"stream info is not available");
return false;
}

absl::string_view key_view(key.ptr, key.length);
const auto* object = stream_info->filterState()->getDataReadOnlyGeneric(key_view);
if (object == nullptr) {
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug,
"key '{}' not found in filter state", key_view);
return false;
}

auto serialized = object->serializeAsString();
if (!serialized.has_value()) {
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug,
"filter state object for key '{}' does not support serialization",
key_view);
return false;
}

// Cluster host selection runs on a worker thread serially per request. We stash the
// serialized buffer on a thread-local so its address survives until the next call to
// this function on the same thread (matching the documented lifetime in abi.h).
thread_local std::string last_serialized_filter_state;
last_serialized_filter_state = std::move(serialized.value());
result->ptr = const_cast<char*>(last_serialized_filter_state.data());
result->length = last_serialized_filter_state.size();
return true;
}

envoy_dynamic_module_type_cluster_scheduler_module_ptr
envoy_dynamic_module_callback_cluster_scheduler_new(
envoy_dynamic_module_type_cluster_envoy_ptr cluster_envoy_ptr) {
Expand Down
42 changes: 42 additions & 0 deletions source/extensions/dynamic_modules/abi/abi.h
Original file line number Diff line number Diff line change
Expand Up @@ -9438,6 +9438,48 @@ bool envoy_dynamic_module_callback_cluster_lb_context_get_downstream_connection_
envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr,
envoy_dynamic_module_type_envoy_buffer* result_buffer);

/**
* envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes is called by the module
* to get the bytes value of the request's filter state with the given key. This lets a
* dynamic-module cluster consume filter state set by an upstream HTTP filter (or any other
* producer that stores a ``Router::StringAccessor``) when picking a host. If the filter state is
* not accessible, the key does not exist, or the value is not a ``Router::StringAccessor``, this
* returns false.
*
* @param context_envoy_ptr is the per-request load balancer context.
* @param key is the key of the filter state.
* @param result is the pointer to the pointer variable where the pointer to the buffer
* of the bytes value will be stored.
* @return true if the operation is successful, false otherwise.
*
* Note that the buffer pointed by the pointer stored in result is owned by Envoy, and
* is guaranteed to be valid for the duration of the current host-selection callback.
*/
bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes(
envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr,
envoy_dynamic_module_type_module_buffer key, envoy_dynamic_module_type_envoy_buffer* result);

/**
* envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed is called by the module
* to get the serialized bytes value of a typed filter state object with the given key. It
* retrieves the object generically and calls serializeAsString to get the bytes representation,
* so it works with any filter state object type (e.g. one set by an upstream HTTP filter through
* a registered ``ObjectFactory``), not just ``Router::StringAccessor``.
*
* @param context_envoy_ptr is the per-request load balancer context.
* @param key is the key of the filter state.
* @param result is the pointer to the buffer where the serialized value will be stored.
* @return true if the operation is successful, false if the stream info is not available, the key
* does not exist, or the object does not support serialization.
*
* Note that the buffer pointed by the pointer stored in result is owned by Envoy, and is
* guaranteed to be valid until the next invocation of this callback on the same worker thread
* or until the end of the current host-selection callback, whichever comes first.
*/
bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed(
envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr,
envoy_dynamic_module_type_module_buffer key, envoy_dynamic_module_type_envoy_buffer* result);

/**
* envoy_dynamic_module_callback_cluster_lb_async_host_selection_complete is called by the module
* to deliver the result of an asynchronous host selection. This must be called exactly once for
Expand Down
16 changes: 16 additions & 0 deletions source/extensions/dynamic_modules/abi_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,22 @@ envoy_dynamic_module_callback_cluster_lb_context_get_downstream_connection_sni(
return false;
}

__attribute__((weak)) bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes(
envoy_dynamic_module_type_cluster_lb_context_envoy_ptr, envoy_dynamic_module_type_module_buffer,
envoy_dynamic_module_type_envoy_buffer*) {
IS_ENVOY_BUG("envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes: "
"not implemented in this context");
return false;
}

__attribute__((weak)) bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed(
envoy_dynamic_module_type_cluster_lb_context_envoy_ptr, envoy_dynamic_module_type_module_buffer,
envoy_dynamic_module_type_envoy_buffer*) {
IS_ENVOY_BUG("envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed: "
"not implemented in this context");
return false;
}

__attribute__((weak)) envoy_dynamic_module_type_cluster_scheduler_module_ptr
envoy_dynamic_module_callback_cluster_scheduler_new(envoy_dynamic_module_type_cluster_envoy_ptr) {
IS_ENVOY_BUG("envoy_dynamic_module_callback_cluster_scheduler_new: "
Expand Down
68 changes: 65 additions & 3 deletions source/extensions/dynamic_modules/sdk/rust/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::buffer::EnvoyBuffer;
use crate::{
abi, drop_wrapped_c_void_ptr, str_to_module_buffer, strs_to_module_buffers, wrap_into_c_void_ptr,
CompletionCallback, EnvoyCounterId, EnvoyCounterVecId, EnvoyGaugeId, EnvoyGaugeVecId,
EnvoyHistogramId, EnvoyHistogramVecId, NEW_CLUSTER_CONFIG_FUNCTION,
abi, bytes_to_module_buffer, drop_wrapped_c_void_ptr, str_to_module_buffer,
strs_to_module_buffers, wrap_into_c_void_ptr, CompletionCallback, EnvoyCounterId,
EnvoyCounterVecId, EnvoyGaugeId, EnvoyGaugeVecId, EnvoyHistogramId, EnvoyHistogramVecId,
NEW_CLUSTER_CONFIG_FUNCTION,
};
use mockall::*;
use std::panic::{catch_unwind, AssertUnwindSafe};
Expand Down Expand Up @@ -226,6 +227,29 @@ pub trait ClusterLbContext {
///
/// Returns `None` if the downstream connection or SNI is not available.
fn get_downstream_connection_sni(&self) -> Option<String>;

/// Returns the bytes value of a `Router::StringAccessor` filter state stored on the request.
///
/// This lets a cluster consume filter state that an upstream HTTP filter set via
/// `EnvoyHttpFilter::set_filter_state_bytes` (or anything else that stores a `StringAccessor`)
/// to make a host-selection decision.
///
/// Returns `None` if the request has no stream info, the key is not present, or the stored
/// value is not a `StringAccessor`. The returned buffer borrows from Envoy and is valid for
/// the duration of the current host-selection callback.
fn get_filter_state_bytes<'a>(&'a self, key: &[u8]) -> Option<EnvoyBuffer<'a>>;

/// Returns the serialized bytes of a typed filter state object stored on the request.
///
/// Works for any filter state object whose registered `ObjectFactory` produces an object that
/// supports `serializeAsString` — e.g. one set by an upstream HTTP filter via
/// `EnvoyHttpFilter::set_filter_state_typed`.
///
/// Returns `None` if the request has no stream info, the key is not present, or the object
/// does not support serialization. The returned buffer borrows from Envoy and is valid until
/// the next call to `get_filter_state_typed` on the same worker thread, or until the end of
/// the current host-selection callback, whichever comes first.
fn get_filter_state_typed<'a>(&'a self, key: &[u8]) -> Option<EnvoyBuffer<'a>>;
}

/// Envoy-side cluster operations available to the module.
Expand Down Expand Up @@ -1830,6 +1854,44 @@ impl ClusterLbContext for ClusterLbContextImpl {
};
Some(sni.into_owned())
}

fn get_filter_state_bytes(&self, key: &[u8]) -> Option<EnvoyBuffer<'_>> {
let mut result = abi::envoy_dynamic_module_type_envoy_buffer {
ptr: std::ptr::null_mut(),
length: 0,
};
let success = unsafe {
abi::envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes(
self.raw_context,
bytes_to_module_buffer(key),
&mut result,
)
};
if success {
Some(unsafe { EnvoyBuffer::new_from_raw(result.ptr as *const _, result.length) })
} else {
None
}
}

fn get_filter_state_typed(&self, key: &[u8]) -> Option<EnvoyBuffer<'_>> {
let mut result = abi::envoy_dynamic_module_type_envoy_buffer {
ptr: std::ptr::null_mut(),
length: 0,
};
let success = unsafe {
abi::envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed(
self.raw_context,
bytes_to_module_buffer(key),
&mut result,
)
};
if success {
Some(unsafe { EnvoyBuffer::new_from_raw(result.ptr as *const _, result.length) })
} else {
None
}
}
}

// Cluster Event Hook Implementations
Expand Down
18 changes: 18 additions & 0 deletions source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4483,6 +4483,24 @@ pub extern "C" fn envoy_dynamic_module_callback_cluster_lb_context_get_downstrea
false
}

#[no_mangle]
pub extern "C" fn envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes(
_context_envoy_ptr: abi::envoy_dynamic_module_type_cluster_lb_context_envoy_ptr,
_key: abi::envoy_dynamic_module_type_module_buffer,
_result: *mut abi::envoy_dynamic_module_type_envoy_buffer,
) -> bool {
false
}

#[no_mangle]
pub extern "C" fn envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed(
_context_envoy_ptr: abi::envoy_dynamic_module_type_cluster_lb_context_envoy_ptr,
_key: abi::envoy_dynamic_module_type_module_buffer,
_result: *mut abi::envoy_dynamic_module_type_envoy_buffer,
) -> bool {
false
}

#[no_mangle]
pub extern "C" fn envoy_dynamic_module_callback_cluster_http_callout(
_cluster_envoy_ptr: abi::envoy_dynamic_module_type_cluster_envoy_ptr,
Expand Down
6 changes: 6 additions & 0 deletions test/extensions/clusters/dynamic_modules/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ envoy_cc_test(
deps = [
"//source/common/config:metadata_lib",
"//source/common/http:message_lib",
"//source/common/router:string_accessor_lib",
"//source/common/stream_info:filter_state_lib",
"//source/extensions/clusters/dynamic_modules:cluster_lib",
"//source/extensions/dynamic_modules:abi_impl",
"//source/extensions/load_balancing_policies/cluster_provided:config",
Expand All @@ -30,6 +32,7 @@ envoy_cc_test(
"//test/mocks/http:http_mocks",
"//test/mocks/network:network_mocks",
"//test/mocks/server:server_factory_context_mocks",
"//test/mocks/stream_info:stream_info_mocks",
"//test/mocks/upstream:cluster_manager_mocks",
"//test/mocks/upstream:load_balancer_context_mock",
"//test/mocks/upstream:priority_set_mocks",
Expand All @@ -46,12 +49,15 @@ envoy_cc_test(
name = "integration_test",
srcs = ["integration_test.cc"],
data = [
"//test/extensions/dynamic_modules/test_data/rust:cluster_filter_state_test",
"//test/extensions/dynamic_modules/test_data/rust:cluster_integration_test",
],
rbe_pool = "6gig",
deps = [
"//source/common/router:string_accessor_lib",
"//source/extensions/clusters/dynamic_modules:cluster",
"//source/extensions/dynamic_modules:abi_impl",
"//source/extensions/filters/http/dynamic_modules:factory_registration",
"//source/extensions/load_balancing_policies/cluster_provided:config",
"//test/extensions/dynamic_modules:util",
"//test/integration:http_integration_lib",
Expand Down
Loading
Loading