Skip to content

Commit b3db1af

Browse files
committed
dynamic_modules: add cluster filter-state read ABI
Adds two ABI callbacks so that a dynamic-module cluster's load balancer can read filter state set by an upstream HTTP filter (or any other producer) when picking a host. Today the cluster context only exposes downstream headers, hash key, override-host, and SNI — filter state was reachable from the C++ side via ``LoadBalancerContext::requestStreamInfo() ->filterState()`` but not surfaced through the ABI, forcing modules to fall back to header round-tripping. ABI additions: * envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes — reads a ``Router::StringAccessor``-typed filter state value. Mirrors the existing HTTP-filter ``get_filter_state_bytes``. * envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed — returns the serialized bytes of a typed filter state object via ``serializeAsString``. Mirrors the existing HTTP-filter ``get_filter_state_typed``. The serialized buffer is stashed in a thread-local string so the returned pointer survives the current host-selection callback. Weak fallback definitions are added alongside the existing cluster-LB weak stubs in ``source/extensions/dynamic_modules/abi_impl.cc`` so that binaries that link the Rust SDK without the cluster_lib (e.g. the HTTP filter integration test) still resolve cleanly. The Rust SDK gains matching ``ClusterLbContext::get_filter_state_bytes`` and ``ClusterLbContext::get_filter_state_typed`` methods. Tests: * test_data/rust/cluster_filter_state_test.rs — single shared library exporting an HTTP filter (``filter_state_producer``) that writes both a bytes value and a typed value during ``on_request_headers``, plus a cluster (``filter_state_reader``) whose load balancer reads both values in ``choose_host`` and only returns its host when both round-trip correctly. * integration_test.cc registers an ``ObjectFactory`` for the typed key and adds ``ReadsFilterStateProducedByHttpFilter``: prepends the producer HTTP filter via ``prependFilter``, swaps cluster_0 for the reader cluster, and asserts a 200 response — proving the filter-state round trip from HTTP filter to cluster host selection. Signed-off-by: Basundhara Chakrabarty <basundhara17061996@gmail.com>
1 parent a0ee8c6 commit b3db1af

12 files changed

Lines changed: 638 additions & 3 deletions

File tree

changelogs/current.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ new_features:
123123
change: |
124124
Added ``envoy_dynamic_module_callback_is_validation_mode`` ABI callback that allows dynamic
125125
modules to check if the server is running in config validation mode.
126+
- area: dynamic_modules
127+
change: |
128+
Added ``envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes`` and
129+
``envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed`` ABI callbacks so
130+
that a dynamic-module cluster's load balancer can read filter state set by an upstream HTTP
131+
filter (or any other producer) when picking a host. The Rust SDK exposes these as
132+
``ClusterLbContext::get_filter_state_bytes`` and ``ClusterLbContext::get_filter_state_typed``.
126133
- area: access_log
127134
change: |
128135
Supported the singleton stats scope in the :ref:`stats access logger <envoy_v3_api_msg_extensions.access_loggers.stats.v3.Config>`.

source/extensions/clusters/dynamic_modules/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ envoy_cc_library(
2525
"//source/common/http:message_lib",
2626
"//source/common/network:address_lib",
2727
"//source/common/network:utility_lib",
28+
"//source/common/router:string_accessor_lib",
2829
"//source/common/stats:utility_lib",
2930
"//source/common/upstream:cluster_factory_lib",
3031
"//source/common/upstream:upstream_includes",

source/extensions/clusters/dynamic_modules/abi_impl.cc

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "source/common/common/assert.h"
66
#include "source/common/common/thread.h"
77
#include "source/common/http/message_impl.h"
8+
#include "source/common/router/string_accessor_impl.h"
89
#include "source/extensions/clusters/dynamic_modules/cluster.h"
910
#include "source/extensions/dynamic_modules/abi/abi.h"
1011

@@ -831,6 +832,70 @@ bool envoy_dynamic_module_callback_cluster_lb_context_get_downstream_connection_
831832
return true;
832833
}
833834

835+
bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes(
836+
envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr,
837+
envoy_dynamic_module_type_module_buffer key, envoy_dynamic_module_type_envoy_buffer* result) {
838+
if (context_envoy_ptr == nullptr || result == nullptr) {
839+
return false;
840+
}
841+
auto* stream_info = getContext(context_envoy_ptr)->requestStreamInfo();
842+
if (!stream_info) {
843+
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug,
844+
"stream info is not available");
845+
return false;
846+
}
847+
absl::string_view key_view(key.ptr, key.length);
848+
auto filter_state =
849+
stream_info->filterState()->getDataReadOnly<Envoy::Router::StringAccessor>(key_view);
850+
if (!filter_state) {
851+
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug,
852+
"key '{}' not found in filter state", key_view);
853+
return false;
854+
}
855+
absl::string_view str = filter_state->asString();
856+
*result = {const_cast<char*>(str.data()), str.size()};
857+
return true;
858+
}
859+
860+
bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed(
861+
envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr,
862+
envoy_dynamic_module_type_module_buffer key, envoy_dynamic_module_type_envoy_buffer* result) {
863+
if (context_envoy_ptr == nullptr || result == nullptr) {
864+
return false;
865+
}
866+
auto* stream_info = getContext(context_envoy_ptr)->requestStreamInfo();
867+
if (!stream_info) {
868+
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug,
869+
"stream info is not available");
870+
return false;
871+
}
872+
873+
absl::string_view key_view(key.ptr, key.length);
874+
const auto* object = stream_info->filterState()->getDataReadOnlyGeneric(key_view);
875+
if (object == nullptr) {
876+
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug,
877+
"key '{}' not found in filter state", key_view);
878+
return false;
879+
}
880+
881+
auto serialized = object->serializeAsString();
882+
if (!serialized.has_value()) {
883+
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::dynamic_modules), debug,
884+
"filter state object for key '{}' does not support serialization",
885+
key_view);
886+
return false;
887+
}
888+
889+
// Cluster host selection runs on a worker thread serially per request. We stash the
890+
// serialized buffer on a thread-local so its address survives until the next call to
891+
// this function on the same thread (matching the documented lifetime in abi.h).
892+
thread_local std::string last_serialized_filter_state;
893+
last_serialized_filter_state = std::move(serialized.value());
894+
result->ptr = const_cast<char*>(last_serialized_filter_state.data());
895+
result->length = last_serialized_filter_state.size();
896+
return true;
897+
}
898+
834899
envoy_dynamic_module_type_cluster_scheduler_module_ptr
835900
envoy_dynamic_module_callback_cluster_scheduler_new(
836901
envoy_dynamic_module_type_cluster_envoy_ptr cluster_envoy_ptr) {

source/extensions/dynamic_modules/abi/abi.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9438,6 +9438,50 @@ bool envoy_dynamic_module_callback_cluster_lb_context_get_downstream_connection_
94389438
envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr,
94399439
envoy_dynamic_module_type_envoy_buffer* result_buffer);
94409440

9441+
/**
9442+
* envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes is called by the module
9443+
* to get the bytes value of the request's filter state with the given key. This is the cluster
9444+
* load-balancer-context counterpart of
9445+
* envoy_dynamic_module_callback_http_get_filter_state_bytes: it lets a dynamic-module cluster
9446+
* consume filter state set by an upstream HTTP filter (or any other producer that stores a
9447+
* ``Router::StringAccessor``) when picking a host. If the filter state is not accessible, the
9448+
* key does not exist or the value is not a ``StringAccessor``, this returns false.
9449+
*
9450+
* @param context_envoy_ptr is the per-request load balancer context.
9451+
* @param key is the key of the filter state.
9452+
* @param result is the pointer to the pointer variable where the pointer to the buffer
9453+
* of the bytes value will be stored.
9454+
* @return true if the operation is successful, false otherwise.
9455+
*
9456+
* Note that the buffer pointed by the pointer stored in result is owned by Envoy, and
9457+
* is guaranteed to be valid for the duration of the current host-selection callback.
9458+
*/
9459+
bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes(
9460+
envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr,
9461+
envoy_dynamic_module_type_module_buffer key, envoy_dynamic_module_type_envoy_buffer* result);
9462+
9463+
/**
9464+
* envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed is called by the module
9465+
* to get the serialized bytes value of a typed filter state object with the given key. This is
9466+
* the cluster load-balancer-context counterpart of
9467+
* envoy_dynamic_module_callback_http_get_filter_state_typed: it retrieves the object generically
9468+
* and calls serializeAsString to get the bytes representation. This works with any filter state
9469+
* object type, not just StringAccessor.
9470+
*
9471+
* @param context_envoy_ptr is the per-request load balancer context.
9472+
* @param key is the key of the filter state.
9473+
* @param result is the pointer to the buffer where the serialized value will be stored.
9474+
* @return true if the operation is successful, false if the stream info is not available, the key
9475+
* does not exist, or the object does not support serialization.
9476+
*
9477+
* Note that the buffer pointed by the pointer stored in result is owned by Envoy, and is
9478+
* guaranteed to be valid until the next invocation of this callback on the same worker thread
9479+
* or until the end of the current host-selection callback, whichever comes first.
9480+
*/
9481+
bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed(
9482+
envoy_dynamic_module_type_cluster_lb_context_envoy_ptr context_envoy_ptr,
9483+
envoy_dynamic_module_type_module_buffer key, envoy_dynamic_module_type_envoy_buffer* result);
9484+
94419485
/**
94429486
* envoy_dynamic_module_callback_cluster_lb_async_host_selection_complete is called by the module
94439487
* to deliver the result of an asynchronous host selection. This must be called exactly once for

source/extensions/dynamic_modules/abi_impl.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,22 @@ envoy_dynamic_module_callback_cluster_lb_context_get_downstream_connection_sni(
745745
return false;
746746
}
747747

748+
__attribute__((weak)) bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes(
749+
envoy_dynamic_module_type_cluster_lb_context_envoy_ptr, envoy_dynamic_module_type_module_buffer,
750+
envoy_dynamic_module_type_envoy_buffer*) {
751+
IS_ENVOY_BUG("envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes: "
752+
"not implemented in this context");
753+
return false;
754+
}
755+
756+
__attribute__((weak)) bool envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed(
757+
envoy_dynamic_module_type_cluster_lb_context_envoy_ptr, envoy_dynamic_module_type_module_buffer,
758+
envoy_dynamic_module_type_envoy_buffer*) {
759+
IS_ENVOY_BUG("envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed: "
760+
"not implemented in this context");
761+
return false;
762+
}
763+
748764
__attribute__((weak)) envoy_dynamic_module_type_cluster_scheduler_module_ptr
749765
envoy_dynamic_module_callback_cluster_scheduler_new(envoy_dynamic_module_type_cluster_envoy_ptr) {
750766
IS_ENVOY_BUG("envoy_dynamic_module_callback_cluster_scheduler_new: "

source/extensions/dynamic_modules/sdk/rust/src/cluster.rs

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,19 @@
11
use crate::buffer::EnvoyBuffer;
22
use crate::{
3-
abi, drop_wrapped_c_void_ptr, str_to_module_buffer, strs_to_module_buffers, wrap_into_c_void_ptr,
4-
CompletionCallback, EnvoyCounterId, EnvoyCounterVecId, EnvoyGaugeId, EnvoyGaugeVecId,
5-
EnvoyHistogramId, EnvoyHistogramVecId, NEW_CLUSTER_CONFIG_FUNCTION,
3+
abi,
4+
bytes_to_module_buffer,
5+
drop_wrapped_c_void_ptr,
6+
str_to_module_buffer,
7+
strs_to_module_buffers,
8+
wrap_into_c_void_ptr,
9+
CompletionCallback,
10+
EnvoyCounterId,
11+
EnvoyCounterVecId,
12+
EnvoyGaugeId,
13+
EnvoyGaugeVecId,
14+
EnvoyHistogramId,
15+
EnvoyHistogramVecId,
16+
NEW_CLUSTER_CONFIG_FUNCTION,
617
};
718
use mockall::*;
819
use std::panic::{catch_unwind, AssertUnwindSafe};
@@ -226,6 +237,29 @@ pub trait ClusterLbContext {
226237
///
227238
/// Returns `None` if the downstream connection or SNI is not available.
228239
fn get_downstream_connection_sni(&self) -> Option<String>;
240+
241+
/// Returns the bytes value of a `Router::StringAccessor` filter state stored on the request.
242+
///
243+
/// This lets a cluster consume filter state that an upstream HTTP filter set via
244+
/// `EnvoyHttpFilter::set_filter_state_bytes` (or anything else that stores a `StringAccessor`)
245+
/// to make a host-selection decision.
246+
///
247+
/// Returns `None` if the request has no stream info, the key is not present, or the stored
248+
/// value is not a `StringAccessor`. The returned buffer borrows from Envoy and is valid for
249+
/// the duration of the current host-selection callback.
250+
fn get_filter_state_bytes<'a>(&'a self, key: &[u8]) -> Option<EnvoyBuffer<'a>>;
251+
252+
/// Returns the serialized bytes of a typed filter state object stored on the request.
253+
///
254+
/// This is the read side of `EnvoyHttpFilter::set_filter_state_typed` and works for any
255+
/// filter state object whose registered `ObjectFactory` produces an object that supports
256+
/// `serializeAsString`.
257+
///
258+
/// Returns `None` if the request has no stream info, the key is not present, or the object
259+
/// does not support serialization. The returned buffer borrows from Envoy and is valid until
260+
/// the next call to `get_filter_state_typed` on the same worker thread, or until the end of
261+
/// the current host-selection callback, whichever comes first.
262+
fn get_filter_state_typed<'a>(&'a self, key: &[u8]) -> Option<EnvoyBuffer<'a>>;
229263
}
230264

231265
/// Envoy-side cluster operations available to the module.
@@ -1830,6 +1864,44 @@ impl ClusterLbContext for ClusterLbContextImpl {
18301864
};
18311865
Some(sni.into_owned())
18321866
}
1867+
1868+
fn get_filter_state_bytes(&self, key: &[u8]) -> Option<EnvoyBuffer<'_>> {
1869+
let mut result = abi::envoy_dynamic_module_type_envoy_buffer {
1870+
ptr: std::ptr::null_mut(),
1871+
length: 0,
1872+
};
1873+
let success = unsafe {
1874+
abi::envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes(
1875+
self.raw_context,
1876+
bytes_to_module_buffer(key),
1877+
&mut result,
1878+
)
1879+
};
1880+
if success {
1881+
Some(unsafe { EnvoyBuffer::new_from_raw(result.ptr as *const _, result.length) })
1882+
} else {
1883+
None
1884+
}
1885+
}
1886+
1887+
fn get_filter_state_typed(&self, key: &[u8]) -> Option<EnvoyBuffer<'_>> {
1888+
let mut result = abi::envoy_dynamic_module_type_envoy_buffer {
1889+
ptr: std::ptr::null_mut(),
1890+
length: 0,
1891+
};
1892+
let success = unsafe {
1893+
abi::envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed(
1894+
self.raw_context,
1895+
bytes_to_module_buffer(key),
1896+
&mut result,
1897+
)
1898+
};
1899+
if success {
1900+
Some(unsafe { EnvoyBuffer::new_from_raw(result.ptr as *const _, result.length) })
1901+
} else {
1902+
None
1903+
}
1904+
}
18331905
}
18341906

18351907
// Cluster Event Hook Implementations

source/extensions/dynamic_modules/sdk/rust/src/lib_test.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4483,6 +4483,24 @@ pub extern "C" fn envoy_dynamic_module_callback_cluster_lb_context_get_downstrea
44834483
false
44844484
}
44854485

4486+
#[no_mangle]
4487+
pub extern "C" fn envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_bytes(
4488+
_context_envoy_ptr: abi::envoy_dynamic_module_type_cluster_lb_context_envoy_ptr,
4489+
_key: abi::envoy_dynamic_module_type_module_buffer,
4490+
_result: *mut abi::envoy_dynamic_module_type_envoy_buffer,
4491+
) -> bool {
4492+
false
4493+
}
4494+
4495+
#[no_mangle]
4496+
pub extern "C" fn envoy_dynamic_module_callback_cluster_lb_context_get_filter_state_typed(
4497+
_context_envoy_ptr: abi::envoy_dynamic_module_type_cluster_lb_context_envoy_ptr,
4498+
_key: abi::envoy_dynamic_module_type_module_buffer,
4499+
_result: *mut abi::envoy_dynamic_module_type_envoy_buffer,
4500+
) -> bool {
4501+
false
4502+
}
4503+
44864504
#[no_mangle]
44874505
pub extern "C" fn envoy_dynamic_module_callback_cluster_http_callout(
44884506
_cluster_envoy_ptr: abi::envoy_dynamic_module_type_cluster_envoy_ptr,

test/extensions/clusters/dynamic_modules/BUILD

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ envoy_cc_test(
2121
deps = [
2222
"//source/common/config:metadata_lib",
2323
"//source/common/http:message_lib",
24+
"//source/common/router:string_accessor_lib",
25+
"//source/common/stream_info:filter_state_lib",
2426
"//source/extensions/clusters/dynamic_modules:cluster_lib",
2527
"//source/extensions/dynamic_modules:abi_impl",
2628
"//source/extensions/load_balancing_policies/cluster_provided:config",
@@ -30,6 +32,7 @@ envoy_cc_test(
3032
"//test/mocks/http:http_mocks",
3133
"//test/mocks/network:network_mocks",
3234
"//test/mocks/server:instance_mocks",
35+
"//test/mocks/stream_info:stream_info_mocks",
3336
"//test/mocks/upstream:cluster_manager_mocks",
3437
"//test/mocks/upstream:load_balancer_context_mock",
3538
"//test/mocks/upstream:priority_set_mocks",
@@ -46,12 +49,15 @@ envoy_cc_test(
4649
name = "integration_test",
4750
srcs = ["integration_test.cc"],
4851
data = [
52+
"//test/extensions/dynamic_modules/test_data/rust:cluster_filter_state_test",
4953
"//test/extensions/dynamic_modules/test_data/rust:cluster_integration_test",
5054
],
5155
rbe_pool = "6gig",
5256
deps = [
57+
"//source/common/router:string_accessor_lib",
5358
"//source/extensions/clusters/dynamic_modules:cluster",
5459
"//source/extensions/dynamic_modules:abi_impl",
60+
"//source/extensions/filters/http/dynamic_modules:factory_registration",
5561
"//source/extensions/load_balancing_policies/cluster_provided:config",
5662
"//test/extensions/dynamic_modules:util",
5763
"//test/integration:http_integration_lib",

0 commit comments

Comments
 (0)