diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 4a51686156567..dde2688ff14b8 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -134,6 +134,13 @@ new_features: change: | Added ``envoy_dynamic_module_callback_is_validation_mode`` ABI callback that allows dynamic modules to check if the server is running in config validation mode. +- area: dynamic_modules + change: | + Added ``envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes`` and + ``envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed`` ABI callbacks so + that a dynamic-module cluster's load balancer can read filter state set by an upstream HTTP + filter (or any other producer) when picking a host. The Rust SDK exposes these as + ``ClusterLbContext::get_filter_state_bytes`` and ``ClusterLbContext::get_filter_state_typed``. - area: access_log change: | Supported the singleton stats scope in the :ref:`stats access logger `. diff --git a/source/extensions/clusters/dynamic_modules/BUILD b/source/extensions/clusters/dynamic_modules/BUILD index eb432c53a167e..bf429e7a3973d 100644 --- a/source/extensions/clusters/dynamic_modules/BUILD +++ b/source/extensions/clusters/dynamic_modules/BUILD @@ -25,6 +25,7 @@ envoy_cc_library( "//source/common/http:message_lib", "//source/common/network:address_lib", "//source/common/network:utility_lib", + "//source/common/router:string_accessor_lib", "//source/common/stats:utility_lib", "//source/common/upstream:cluster_factory_lib", "//source/common/upstream:upstream_includes", diff --git a/source/extensions/clusters/dynamic_modules/abi_impl.cc b/source/extensions/clusters/dynamic_modules/abi_impl.cc index 13bc58819c8ca..c9bca9324c330 100644 --- a/source/extensions/clusters/dynamic_modules/abi_impl.cc +++ b/source/extensions/clusters/dynamic_modules/abi_impl.cc @@ -5,6 +5,7 @@ #include "source/common/common/assert.h" #include "source/common/common/thread.h" #include "source/common/http/message_impl.h" +#include "source/common/router/string_accessor_impl.h" #include "source/extensions/clusters/dynamic_modules/cluster.h" #include "source/extensions/dynamic_modules/abi/abi.h" @@ -831,6 +832,70 @@ bool envoy_dynamic_module_callback_cluster_lb_context_get_downstream_connection_ return true; } +bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes( + envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr, + envoy_dynamic_module_type_module_buffer key, envoy_dynamic_module_type_envoy_buffer* result) { + if (context_envoy_ptr == nullptr || result == nullptr) { + return false; + } + auto* stream_info = getContext(context_envoy_ptr)->requestStreamInfo(); + if (!stream_info) { + ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug, + "stream info is not available"); + return false; + } + absl::string_view key_view(key.ptr, key.length); + auto filter_state = + stream_info->filterState()->getDataReadOnly(key_view); + if (!filter_state) { + ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug, + "key '{}' not found in filter state", key_view); + return false; + } + absl::string_view str = filter_state->asString(); + *result = {const_cast(str.data()), str.size()}; + return true; +} + +bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed( + envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr, + envoy_dynamic_module_type_module_buffer key, envoy_dynamic_module_type_envoy_buffer* result) { + if (context_envoy_ptr == nullptr || result == nullptr) { + return false; + } + auto* stream_info = getContext(context_envoy_ptr)->requestStreamInfo(); + if (!stream_info) { + ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug, + "stream info is not available"); + return false; + } + + absl::string_view key_view(key.ptr, key.length); + const auto* object = stream_info->filterState()->getDataReadOnlyGeneric(key_view); + if (object == nullptr) { + ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug, + "key '{}' not found in filter state", key_view); + return false; + } + + auto serialized = object->serializeAsString(); + if (!serialized.has_value()) { + ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug, + "filter state object for key '{}' does not support serialization", + key_view); + return false; + } + + // Cluster host selection runs on a worker thread serially per request. We stash the + // serialized buffer on a thread-local so its address survives until the next call to + // this function on the same thread (matching the documented lifetime in abi.h). + thread_local std::string last_serialized_filter_state; + last_serialized_filter_state = std::move(serialized.value()); + result->ptr = const_cast(last_serialized_filter_state.data()); + result->length = last_serialized_filter_state.size(); + return true; +} + envoy_dynamic_module_type_cluster_scheduler_module_ptr envoy_dynamic_module_callback_cluster_scheduler_new( envoy_dynamic_module_type_cluster_envoy_ptr cluster_envoy_ptr) { diff --git a/source/extensions/dynamic_modules/abi/abi.h b/source/extensions/dynamic_modules/abi/abi.h index a2c63c472df9d..b6ab7f493f29d 100644 --- a/source/extensions/dynamic_modules/abi/abi.h +++ b/source/extensions/dynamic_modules/abi/abi.h @@ -9438,6 +9438,48 @@ bool envoy_dynamic_module_callback_cluster_lb_context_get_downstream_connection_ envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr, envoy_dynamic_module_type_envoy_buffer* result_buffer); +/** + * envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes is called by the module + * to get the bytes value of the request's filter state with the given key. This lets a + * dynamic-module cluster consume filter state set by an upstream HTTP filter (or any other + * producer that stores a ``Router::StringAccessor``) when picking a host. If the filter state is + * not accessible, the key does not exist, or the value is not a ``Router::StringAccessor``, this + * returns false. + * + * @param context_envoy_ptr is the per-request load balancer context. + * @param key is the key of the filter state. + * @param result is the pointer to the pointer variable where the pointer to the buffer + * of the bytes value will be stored. + * @return true if the operation is successful, false otherwise. + * + * Note that the buffer pointed by the pointer stored in result is owned by Envoy, and + * is guaranteed to be valid for the duration of the current host-selection callback. + */ +bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes( + envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr, + envoy_dynamic_module_type_module_buffer key, envoy_dynamic_module_type_envoy_buffer* result); + +/** + * envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed is called by the module + * to get the serialized bytes value of a typed filter state object with the given key. It + * retrieves the object generically and calls serializeAsString to get the bytes representation, + * so it works with any filter state object type (e.g. one set by an upstream HTTP filter through + * a registered ``ObjectFactory``), not just ``Router::StringAccessor``. + * + * @param context_envoy_ptr is the per-request load balancer context. + * @param key is the key of the filter state. + * @param result is the pointer to the buffer where the serialized value will be stored. + * @return true if the operation is successful, false if the stream info is not available, the key + * does not exist, or the object does not support serialization. + * + * Note that the buffer pointed by the pointer stored in result is owned by Envoy, and is + * guaranteed to be valid until the next invocation of this callback on the same worker thread + * or until the end of the current host-selection callback, whichever comes first. + */ +bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed( + envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr, + envoy_dynamic_module_type_module_buffer key, envoy_dynamic_module_type_envoy_buffer* result); + /** * envoy_dynamic_module_callback_cluster_lb_async_host_selection_complete is called by the module * to deliver the result of an asynchronous host selection. This must be called exactly once for diff --git a/source/extensions/dynamic_modules/abi_impl.cc b/source/extensions/dynamic_modules/abi_impl.cc index 062c8c8c1d15e..ef30d3622774c 100644 --- a/source/extensions/dynamic_modules/abi_impl.cc +++ b/source/extensions/dynamic_modules/abi_impl.cc @@ -745,6 +745,22 @@ envoy_dynamic_module_callback_cluster_lb_context_get_downstream_connection_sni( return false; } +__attribute__((weak)) bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes( + envoy_dynamic_module_type_cluster_lb_context_envoy_ptr, envoy_dynamic_module_type_module_buffer, + envoy_dynamic_module_type_envoy_buffer*) { + IS_ENVOY_BUG("envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes: " + "not implemented in this context"); + return false; +} + +__attribute__((weak)) bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed( + envoy_dynamic_module_type_cluster_lb_context_envoy_ptr, envoy_dynamic_module_type_module_buffer, + envoy_dynamic_module_type_envoy_buffer*) { + IS_ENVOY_BUG("envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed: " + "not implemented in this context"); + return false; +} + __attribute__((weak)) envoy_dynamic_module_type_cluster_scheduler_module_ptr envoy_dynamic_module_callback_cluster_scheduler_new(envoy_dynamic_module_type_cluster_envoy_ptr) { IS_ENVOY_BUG("envoy_dynamic_module_callback_cluster_scheduler_new: " diff --git a/source/extensions/dynamic_modules/sdk/rust/src/cluster.rs b/source/extensions/dynamic_modules/sdk/rust/src/cluster.rs index f74a2c8df0584..78329966e45f9 100644 --- a/source/extensions/dynamic_modules/sdk/rust/src/cluster.rs +++ b/source/extensions/dynamic_modules/sdk/rust/src/cluster.rs @@ -1,8 +1,9 @@ use crate::buffer::EnvoyBuffer; use crate::{ - abi, drop_wrapped_c_void_ptr, str_to_module_buffer, strs_to_module_buffers, wrap_into_c_void_ptr, - CompletionCallback, EnvoyCounterId, EnvoyCounterVecId, EnvoyGaugeId, EnvoyGaugeVecId, - EnvoyHistogramId, EnvoyHistogramVecId, NEW_CLUSTER_CONFIG_FUNCTION, + abi, bytes_to_module_buffer, drop_wrapped_c_void_ptr, str_to_module_buffer, + strs_to_module_buffers, wrap_into_c_void_ptr, CompletionCallback, EnvoyCounterId, + EnvoyCounterVecId, EnvoyGaugeId, EnvoyGaugeVecId, EnvoyHistogramId, EnvoyHistogramVecId, + NEW_CLUSTER_CONFIG_FUNCTION, }; use mockall::*; use std::panic::{catch_unwind, AssertUnwindSafe}; @@ -226,6 +227,29 @@ pub trait ClusterLbContext { /// /// Returns `None` if the downstream connection or SNI is not available. fn get_downstream_connection_sni(&self) -> Option; + + /// Returns the bytes value of a `Router::StringAccessor` filter state stored on the request. + /// + /// This lets a cluster consume filter state that an upstream HTTP filter set via + /// `EnvoyHttpFilter::set_filter_state_bytes` (or anything else that stores a `StringAccessor`) + /// to make a host-selection decision. + /// + /// Returns `None` if the request has no stream info, the key is not present, or the stored + /// value is not a `StringAccessor`. The returned buffer borrows from Envoy and is valid for + /// the duration of the current host-selection callback. + fn get_filter_state_bytes<'a>(&'a self, key: &[u8]) -> Option>; + + /// Returns the serialized bytes of a typed filter state object stored on the request. + /// + /// Works for any filter state object whose registered `ObjectFactory` produces an object that + /// supports `serializeAsString` — e.g. one set by an upstream HTTP filter via + /// `EnvoyHttpFilter::set_filter_state_typed`. + /// + /// Returns `None` if the request has no stream info, the key is not present, or the object + /// does not support serialization. The returned buffer borrows from Envoy and is valid until + /// the next call to `get_filter_state_typed` on the same worker thread, or until the end of + /// the current host-selection callback, whichever comes first. + fn get_filter_state_typed<'a>(&'a self, key: &[u8]) -> Option>; } /// Envoy-side cluster operations available to the module. @@ -1830,6 +1854,44 @@ impl ClusterLbContext for ClusterLbContextImpl { }; Some(sni.into_owned()) } + + fn get_filter_state_bytes(&self, key: &[u8]) -> Option> { + let mut result = abi::envoy_dynamic_module_type_envoy_buffer { + ptr: std::ptr::null_mut(), + length: 0, + }; + let success = unsafe { + abi::envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes( + self.raw_context, + bytes_to_module_buffer(key), + &mut result, + ) + }; + if success { + Some(unsafe { EnvoyBuffer::new_from_raw(result.ptr as *const _, result.length) }) + } else { + None + } + } + + fn get_filter_state_typed(&self, key: &[u8]) -> Option> { + let mut result = abi::envoy_dynamic_module_type_envoy_buffer { + ptr: std::ptr::null_mut(), + length: 0, + }; + let success = unsafe { + abi::envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed( + self.raw_context, + bytes_to_module_buffer(key), + &mut result, + ) + }; + if success { + Some(unsafe { EnvoyBuffer::new_from_raw(result.ptr as *const _, result.length) }) + } else { + None + } + } } // Cluster Event Hook Implementations diff --git a/source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs b/source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs index 4b8a808dd446b..dc6734f0a03f9 100644 --- a/source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs +++ b/source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs @@ -4483,6 +4483,24 @@ pub extern "C" fn envoy_dynamic_module_callback_cluster_lb_context_get_downstrea false } +#[no_mangle] +pub extern "C" fn envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes( + _context_envoy_ptr: abi::envoy_dynamic_module_type_cluster_lb_context_envoy_ptr, + _key: abi::envoy_dynamic_module_type_module_buffer, + _result: *mut abi::envoy_dynamic_module_type_envoy_buffer, +) -> bool { + false +} + +#[no_mangle] +pub extern "C" fn envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed( + _context_envoy_ptr: abi::envoy_dynamic_module_type_cluster_lb_context_envoy_ptr, + _key: abi::envoy_dynamic_module_type_module_buffer, + _result: *mut abi::envoy_dynamic_module_type_envoy_buffer, +) -> bool { + false +} + #[no_mangle] pub extern "C" fn envoy_dynamic_module_callback_cluster_http_callout( _cluster_envoy_ptr: abi::envoy_dynamic_module_type_cluster_envoy_ptr, diff --git a/test/extensions/clusters/dynamic_modules/BUILD b/test/extensions/clusters/dynamic_modules/BUILD index eb99f547812ba..8838ad0139ffb 100644 --- a/test/extensions/clusters/dynamic_modules/BUILD +++ b/test/extensions/clusters/dynamic_modules/BUILD @@ -21,6 +21,8 @@ envoy_cc_test( deps = [ "//source/common/config:metadata_lib", "//source/common/http:message_lib", + "//source/common/router:string_accessor_lib", + "//source/common/stream_info:filter_state_lib", "//source/extensions/clusters/dynamic_modules:cluster_lib", "//source/extensions/dynamic_modules:abi_impl", "//source/extensions/load_balancing_policies/cluster_provided:config", @@ -30,6 +32,7 @@ envoy_cc_test( "//test/mocks/http:http_mocks", "//test/mocks/network:network_mocks", "//test/mocks/server:server_factory_context_mocks", + "//test/mocks/stream_info:stream_info_mocks", "//test/mocks/upstream:cluster_manager_mocks", "//test/mocks/upstream:load_balancer_context_mock", "//test/mocks/upstream:priority_set_mocks", @@ -46,12 +49,15 @@ envoy_cc_test( name = "integration_test", srcs = ["integration_test.cc"], data = [ + "//test/extensions/dynamic_modules/test_data/rust:cluster_filter_state_test", "//test/extensions/dynamic_modules/test_data/rust:cluster_integration_test", ], rbe_pool = "6gig", deps = [ + "//source/common/router:string_accessor_lib", "//source/extensions/clusters/dynamic_modules:cluster", "//source/extensions/dynamic_modules:abi_impl", + "//source/extensions/filters/http/dynamic_modules:factory_registration", "//source/extensions/load_balancing_policies/cluster_provided:config", "//test/extensions/dynamic_modules:util", "//test/integration:http_integration_lib", diff --git a/test/extensions/clusters/dynamic_modules/cluster_test.cc b/test/extensions/clusters/dynamic_modules/cluster_test.cc index 37bf05dee2447..1ca55f05236cc 100644 --- a/test/extensions/clusters/dynamic_modules/cluster_test.cc +++ b/test/extensions/clusters/dynamic_modules/cluster_test.cc @@ -6,6 +6,8 @@ #include "source/common/config/metadata.h" #include "source/common/http/message_impl.h" +#include "source/common/router/string_accessor_impl.h" +#include "source/common/stream_info/filter_state_impl.h" #include "source/extensions/clusters/dynamic_modules/cluster.h" #include "test/common/upstream/utility.h" @@ -13,6 +15,7 @@ #include "test/mocks/http/mocks.h" #include "test/mocks/network/connection.h" #include "test/mocks/server/server_factory_context.h" +#include "test/mocks/stream_info/mocks.h" #include "test/mocks/upstream/cluster_manager.h" #include "test/mocks/upstream/load_balancer_context.h" #include "test/mocks/upstream/priority_set.h" @@ -2121,6 +2124,149 @@ TEST_F(DynamicModuleClusterTest, LbContextGetDownstreamConnectionSniNullResult) context_ptr, nullptr)); } +namespace { + +// A filter state object that does not support serializeAsString — used to exercise the typed-get +// "object does not support serialization" failure branch. +class UnserializableFilterStateObject : public StreamInfo::FilterState::Object {}; + +} // namespace + +// Test get_filter_state_bytes with nullptr context. +TEST_F(DynamicModuleClusterTest, LbContextGetFilterStateBytesNullContext) { + std::string key = "k"; + envoy_dynamic_module_type_module_buffer key_buf = {key.data(), key.size()}; + envoy_dynamic_module_type_envoy_buffer result; + EXPECT_FALSE(envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes( + nullptr, key_buf, &result)); +} + +// Test get_filter_state_bytes with nullptr result. +TEST_F(DynamicModuleClusterTest, LbContextGetFilterStateBytesNullResult) { + NiceMock context; + auto* context_ptr = static_cast(&context); + std::string key = "k"; + envoy_dynamic_module_type_module_buffer key_buf = {key.data(), key.size()}; + EXPECT_FALSE(envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes( + context_ptr, key_buf, nullptr)); +} + +// Test get_filter_state_bytes when the request has no stream info. +TEST_F(DynamicModuleClusterTest, LbContextGetFilterStateBytesNoStreamInfo) { + NiceMock context; + ON_CALL(context, requestStreamInfo()).WillByDefault(Return(nullptr)); + auto* context_ptr = static_cast(&context); + std::string key = "k"; + envoy_dynamic_module_type_module_buffer key_buf = {key.data(), key.size()}; + envoy_dynamic_module_type_envoy_buffer result; + EXPECT_FALSE(envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes( + context_ptr, key_buf, &result)); +} + +// Test get_filter_state_bytes when the key is not present. +TEST_F(DynamicModuleClusterTest, LbContextGetFilterStateBytesNotFound) { + NiceMock context; + NiceMock stream_info; + ON_CALL(context, requestStreamInfo()).WillByDefault(Return(&stream_info)); + auto* context_ptr = static_cast(&context); + std::string key = "missing"; + envoy_dynamic_module_type_module_buffer key_buf = {key.data(), key.size()}; + envoy_dynamic_module_type_envoy_buffer result; + EXPECT_FALSE(envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes( + context_ptr, key_buf, &result)); +} + +// Test get_filter_state_bytes happy path: producer stored a StringAccessor; cluster reads it. +TEST_F(DynamicModuleClusterTest, LbContextGetFilterStateBytesFound) { + NiceMock context; + NiceMock stream_info; + stream_info.filter_state_->setData("k", std::make_unique("v"), + StreamInfo::FilterState::StateType::ReadOnly); + ON_CALL(context, requestStreamInfo()).WillByDefault(Return(&stream_info)); + auto* context_ptr = static_cast(&context); + std::string key = "k"; + envoy_dynamic_module_type_module_buffer key_buf = {key.data(), key.size()}; + envoy_dynamic_module_type_envoy_buffer result; + EXPECT_TRUE(envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes( + context_ptr, key_buf, &result)); + EXPECT_EQ("v", absl::string_view(result.ptr, result.length)); +} + +// Test get_filter_state_typed with nullptr context. +TEST_F(DynamicModuleClusterTest, LbContextGetFilterStateTypedNullContext) { + std::string key = "k"; + envoy_dynamic_module_type_module_buffer key_buf = {key.data(), key.size()}; + envoy_dynamic_module_type_envoy_buffer result; + EXPECT_FALSE(envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed( + nullptr, key_buf, &result)); +} + +// Test get_filter_state_typed with nullptr result. +TEST_F(DynamicModuleClusterTest, LbContextGetFilterStateTypedNullResult) { + NiceMock context; + auto* context_ptr = static_cast(&context); + std::string key = "k"; + envoy_dynamic_module_type_module_buffer key_buf = {key.data(), key.size()}; + EXPECT_FALSE(envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed( + context_ptr, key_buf, nullptr)); +} + +// Test get_filter_state_typed when the request has no stream info. +TEST_F(DynamicModuleClusterTest, LbContextGetFilterStateTypedNoStreamInfo) { + NiceMock context; + ON_CALL(context, requestStreamInfo()).WillByDefault(Return(nullptr)); + auto* context_ptr = static_cast(&context); + std::string key = "k"; + envoy_dynamic_module_type_module_buffer key_buf = {key.data(), key.size()}; + envoy_dynamic_module_type_envoy_buffer result; + EXPECT_FALSE(envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed( + context_ptr, key_buf, &result)); +} + +// Test get_filter_state_typed when the key is not present. +TEST_F(DynamicModuleClusterTest, LbContextGetFilterStateTypedNotFound) { + NiceMock context; + NiceMock stream_info; + ON_CALL(context, requestStreamInfo()).WillByDefault(Return(&stream_info)); + auto* context_ptr = static_cast(&context); + std::string key = "missing"; + envoy_dynamic_module_type_module_buffer key_buf = {key.data(), key.size()}; + envoy_dynamic_module_type_envoy_buffer result; + EXPECT_FALSE(envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed( + context_ptr, key_buf, &result)); +} + +// Test get_filter_state_typed when the stored object does not support serialization. +TEST_F(DynamicModuleClusterTest, LbContextGetFilterStateTypedNotSerializable) { + NiceMock context; + NiceMock stream_info; + stream_info.filter_state_->setData("k", std::make_unique(), + StreamInfo::FilterState::StateType::ReadOnly); + ON_CALL(context, requestStreamInfo()).WillByDefault(Return(&stream_info)); + auto* context_ptr = static_cast(&context); + std::string key = "k"; + envoy_dynamic_module_type_module_buffer key_buf = {key.data(), key.size()}; + envoy_dynamic_module_type_envoy_buffer result; + EXPECT_FALSE(envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed( + context_ptr, key_buf, &result)); +} + +// Test get_filter_state_typed happy path: producer stored a serializable object; cluster reads it. +TEST_F(DynamicModuleClusterTest, LbContextGetFilterStateTypedFound) { + NiceMock context; + NiceMock stream_info; + stream_info.filter_state_->setData("k", std::make_unique("typed-v"), + StreamInfo::FilterState::StateType::ReadOnly); + ON_CALL(context, requestStreamInfo()).WillByDefault(Return(&stream_info)); + auto* context_ptr = static_cast(&context); + std::string key = "k"; + envoy_dynamic_module_type_module_buffer key_buf = {key.data(), key.size()}; + envoy_dynamic_module_type_envoy_buffer result; + EXPECT_TRUE(envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed( + context_ptr, key_buf, &result)); + EXPECT_EQ("typed-v", absl::string_view(result.ptr, result.length)); +} + // ================================================================================================= // Async Host Selection Tests // ================================================================================================= diff --git a/test/extensions/clusters/dynamic_modules/integration_test.cc b/test/extensions/clusters/dynamic_modules/integration_test.cc index 97388f3842263..610c710dffec7 100644 --- a/test/extensions/clusters/dynamic_modules/integration_test.cc +++ b/test/extensions/clusters/dynamic_modules/integration_test.cc @@ -1,8 +1,11 @@ #include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/config/cluster/v3/cluster.pb.h" #include "envoy/extensions/clusters/dynamic_modules/v3/cluster.pb.h" +#include "envoy/registry/registry.h" +#include "envoy/stream_info/filter_state.h" #include "source/common/protobuf/protobuf.h" +#include "source/common/router/string_accessor_impl.h" #include "test/extensions/dynamic_modules/util.h" #include "test/integration/http_integration.h" @@ -12,6 +15,25 @@ namespace Extensions { namespace Clusters { namespace DynamicModules { +namespace { + +// ObjectFactory used by the cluster filter-state read test: the dynamic-module HTTP filter +// writes through this factory via envoy_dynamic_module_callback_http_set_filter_state_typed, +// and the dynamic-module cluster reads it back during host selection via +// envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed. +class ClusterTypedObjectFactory : public StreamInfo::FilterState::ObjectFactory { +public: + std::string name() const override { return "envoy.test.cluster_typed_object"; } + std::unique_ptr + createFromBytes(absl::string_view data) const override { + return std::make_unique(data); + } +}; + +REGISTER_FACTORY(ClusterTypedObjectFactory, StreamInfo::FilterState::ObjectFactory); + +} // namespace + class DynamicModuleClusterIntegrationTest : public testing::TestWithParam, public HttpIntegrationTest { @@ -133,6 +155,84 @@ TEST_P(DynamicModuleClusterIntegrationTest, LifecycleCallbacks) { EXPECT_EQ("200", response->headers().getStatusValue()); } +// ============================================================================= +// Filter-state read ABI: an upstream HTTP filter writes filter state on the +// request path; the dynamic-module cluster reads it back during host selection. +// ============================================================================= +class DynamicModuleClusterFilterStateIntegrationTest + : public testing::TestWithParam, + public HttpIntegrationTest { +public: + DynamicModuleClusterFilterStateIntegrationTest() + : HttpIntegrationTest(Http::CodecType::HTTP1, GetParam()) {} + + void initializeWithProducerAndReader() { + TestEnvironment::setEnvVar( + "ENVOY_DYNAMIC_MODULES_SEARCH_PATH", + TestEnvironment::runfilesPath("test/extensions/dynamic_modules/test_data/rust"), 1); + + // Prepend the dynamic-module HTTP filter so it writes filter state on every + // request before the router runs. + constexpr absl::string_view producer_filter_config = R"EOF( +name: envoy.extensions.filters.http.dynamic_modules +typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_modules.v3.DynamicModuleFilter + dynamic_module_config: + name: cluster_filter_state_test + filter_name: filter_state_producer + filter_config: + "@type": type.googleapis.com/google.protobuf.StringValue + value: "" +)EOF"; + config_helper_.prependFilter(std::string(producer_filter_config)); + + // Replace cluster_0 with a dynamic-module cluster whose Rust load balancer + // reads the filter state we just wrote. + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* cluster = bootstrap.mutable_static_resources()->mutable_clusters(0); + const std::string upstream_address = fake_upstreams_[0]->localAddress()->asString(); + + cluster->set_name("cluster_0"); + cluster->set_lb_policy(envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED); + cluster->clear_load_assignment(); + + envoy::extensions::clusters::dynamic_modules::v3::ClusterConfig reader_config; + reader_config.mutable_dynamic_module_config()->set_name("cluster_filter_state_test"); + reader_config.set_cluster_name("filter_state_reader"); + + Protobuf::StringValue config_proto; + config_proto.set_value(upstream_address); + reader_config.mutable_cluster_config()->PackFrom(config_proto); + + cluster->mutable_cluster_type()->set_name("envoy.clusters.dynamic_modules"); + cluster->mutable_cluster_type()->mutable_typed_config()->PackFrom(reader_config); + }); + + HttpIntegrationTest::initialize(); + } +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, DynamicModuleClusterFilterStateIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +// Verifies that filter state written by an upstream HTTP filter on the request +// path is observable in the dynamic-module cluster's choose_host callback via +// both the bytes and the typed filter-state ABI accessors. The cluster returns +// its only host only when both values match the producer's payload, so a 200 +// response proves the round trip. +TEST_P(DynamicModuleClusterFilterStateIntegrationTest, ReadsFilterStateProducedByHttpFilter) { + initializeWithProducerAndReader(); + codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http"))); + + auto response = + sendRequestAndWaitForResponse(default_request_headers_, 0, default_response_headers_, 0); + + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); +} + } // namespace DynamicModules } // namespace Clusters } // namespace Extensions diff --git a/test/extensions/dynamic_modules/test_data/rust/BUILD b/test/extensions/dynamic_modules/test_data/rust/BUILD index 9185767f5fcef..dd86138a01889 100644 --- a/test/extensions/dynamic_modules/test_data/rust/BUILD +++ b/test/extensions/dynamic_modules/test_data/rust/BUILD @@ -59,3 +59,5 @@ test_program(name = "access_log_integration_test") test_program(name = "upstream_http_tcp_bridge") test_program(name = "cluster_integration_test") + +test_program(name = "cluster_filter_state_test") diff --git a/test/extensions/dynamic_modules/test_data/rust/cluster_filter_state_test.rs b/test/extensions/dynamic_modules/test_data/rust/cluster_filter_state_test.rs new file mode 100644 index 0000000000000..a344bc1566ee7 --- /dev/null +++ b/test/extensions/dynamic_modules/test_data/rust/cluster_filter_state_test.rs @@ -0,0 +1,158 @@ +//! Integration test for the cluster filter-state read ABI. +//! +//! Exposes both an HTTP filter and a cluster from the same shared library: +//! - The HTTP filter writes two filter state entries (bytes + typed) during ``on_request_headers``. +//! - The cluster reads those entries in ``choose_host`` and returns its pre-registered upstream +//! only when both values match the expected payload, otherwise it returns ``NoHost`` so the +//! request fails with a 503. +//! +//! The C++ test harness registers an ``ObjectFactory`` under +//! ``envoy.test.cluster_typed_object`` so the typed write/read path is exercised. + +use envoy_proxy_dynamic_modules_rust_sdk::*; +use std::sync::{Arc, Mutex}; + +const BYTES_KEY: &[u8] = b"test.cluster_filter_state.bytes_key"; +const BYTES_VALUE: &[u8] = b"bytes_value"; +const TYPED_KEY: &[u8] = b"envoy.test.cluster_typed_object"; +const TYPED_VALUE: &[u8] = b"typed_value"; + +declare_all_init_functions!( + my_program_init, + http: new_http_filter_config_fn, + cluster: new_cluster_config_fn, +); + +fn my_program_init() -> bool { + true +} + +// ------------------------------------------------------------------------------------- +// HTTP filter that produces filter state. +// ------------------------------------------------------------------------------------- + +fn new_http_filter_config_fn( + _envoy_filter_config: &mut EC, + name: &str, + _config: &[u8], +) -> Option>> { + match name { + "filter_state_producer" => Some(Box::new(FilterStateProducerConfig {})), + _ => None, + } +} + +struct FilterStateProducerConfig {} + +impl HttpFilterConfig for FilterStateProducerConfig { + fn new_http_filter(&self, _envoy: &mut EHF) -> Box> { + Box::new(FilterStateProducerFilter {}) + } +} + +struct FilterStateProducerFilter {} + +impl HttpFilter for FilterStateProducerFilter { + fn on_request_headers( + &mut self, + envoy_filter: &mut EHF, + _end_of_stream: bool, + ) -> abi::envoy_dynamic_module_type_on_http_filter_request_headers_status { + assert!(envoy_filter.set_filter_state_bytes(BYTES_KEY, BYTES_VALUE)); + assert!(envoy_filter.set_filter_state_typed(TYPED_KEY, TYPED_VALUE)); + abi::envoy_dynamic_module_type_on_http_filter_request_headers_status::Continue + } +} + +// ------------------------------------------------------------------------------------- +// Cluster that reads filter state during host selection. +// ------------------------------------------------------------------------------------- + +struct HostList(Vec); +// SAFETY: Host pointers are stable addresses managed by Envoy across threads. +unsafe impl Send for HostList {} +unsafe impl Sync for HostList {} + +type SharedHostList = Arc>; + +fn new_cluster_config_fn( + name: &str, + config: &[u8], + _envoy_cluster_metrics: Arc, +) -> Option> { + let upstream_address = std::str::from_utf8(config).unwrap_or("").to_string(); + match name { + "filter_state_reader" => Some(Box::new(FilterStateReaderClusterConfig { + upstream_address, + })), + _ => None, + } +} + +struct FilterStateReaderClusterConfig { + upstream_address: String, +} + +impl ClusterConfig for FilterStateReaderClusterConfig { + fn new_cluster(&self, _envoy_cluster: &dyn EnvoyCluster) -> Box { + Box::new(FilterStateReaderCluster { + upstream_address: self.upstream_address.clone(), + hosts: Arc::new(Mutex::new(HostList(Vec::new()))), + }) + } +} + +struct FilterStateReaderCluster { + upstream_address: String, + hosts: SharedHostList, +} + +impl Cluster for FilterStateReaderCluster { + fn on_init(&mut self, envoy_cluster: &dyn EnvoyCluster) { + let addresses = vec![self.upstream_address.clone()]; + let weights = vec![1u32]; + if let Some(host_ptrs) = envoy_cluster.add_hosts(&addresses, &weights) { + self.hosts.lock().unwrap().0 = host_ptrs; + } + envoy_cluster.pre_init_complete(); + } + + fn new_load_balancer(&self, _envoy_lb: &dyn EnvoyClusterLoadBalancer) -> Box { + Box::new(FilterStateReaderLb { + hosts: self.hosts.clone(), + }) + } +} + +struct FilterStateReaderLb { + hosts: SharedHostList, +} + +impl ClusterLb for FilterStateReaderLb { + fn choose_host( + &mut self, + context: Option<&dyn ClusterLbContext>, + _async_completion: Box, + ) -> HostSelectionResult { + let Some(ctx) = context else { + return HostSelectionResult::NoHost; + }; + let bytes = ctx.get_filter_state_bytes(BYTES_KEY); + let typed = ctx.get_filter_state_typed(TYPED_KEY); + let bytes_match = bytes.as_ref().map(|b| b.as_slice()) == Some(BYTES_VALUE); + let typed_match = typed.as_ref().map(|b| b.as_slice()) == Some(TYPED_VALUE); + if !bytes_match || !typed_match { + envoy_log_info!( + "filter_state_reader: mismatch — bytes_match={} typed_match={}", + bytes_match, + typed_match + ); + return HostSelectionResult::NoHost; + } + let hosts = self.hosts.lock().unwrap(); + if hosts.0.is_empty() { + return HostSelectionResult::NoHost; + } + HostSelectionResult::Selected(hosts.0[0]) + } +}