Skip to content

Commit 1d9717b

Browse files
committed
client routes: add exclusive-proxy config flag for strict proxy enforcement
Introduces advanced.client-routes.exclusive-proxy (boolean, default false) to control whether the driver falls back to a node's broadcast address when no route entry exists in system.client_routes. When false (default, backward-compatible): nodes without a route entry are reached directly via their broadcast address, preserving support for mixed proxy/direct topologies. When true: any node without a route entry is treated as unreachable; the driver throws IllegalStateException with a WARN log and the node stays DOWN until CLIENT_ROUTES_CHANGE populates the route. This matches the behavior previously hardcoded in the prior commit. Changes: - DefaultDriverOption: add CLIENT_ROUTES_EXCLUSIVE_PROXY - TypedDriverOption: add typed Boolean constant - reference.conf: add exclusive-proxy = false with full documentation - ClientRoutesEndPoint: restore fallbackEndPoint field; resolve() gates the throw/fallback on the exclusiveProxy flag - ClientRoutesTopologyMonitor: read config flag; pass fallback endpoint (null when exclusive-proxy=true) and flag to ClientRoutesEndPoint - Tests: update constructor calls; rename and add test cases for both modes; add createHandlerWithExclusiveProxy() helper
1 parent 90b8684 commit 1d9717b

7 files changed

Lines changed: 145 additions & 26 deletions

File tree

core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1119,7 +1119,22 @@ public enum DefaultDriverOption implements DriverOption {
11191119
*
11201120
* <p>Value type: boolean
11211121
*/
1122-
CLIENT_ROUTES_SHARD_AWARENESS_ENABLED("advanced.client-routes.shard-awarness-enabled");
1122+
CLIENT_ROUTES_SHARD_AWARENESS_ENABLED("advanced.client-routes.shard-awarness-enabled"),
1123+
1124+
/**
1125+
* Whether the driver connects exclusively through proxies when client routes are configured.
1126+
*
1127+
* <p>When {@code true}, any node whose {@code host_id} does not appear in the {@code
1128+
* system.client_routes} table is treated as unreachable; the driver will never fall back to the
1129+
* node's broadcast address. The node stays DOWN and the reconnection loop retries until a {@code
1130+
* CLIENT_ROUTES_CHANGE} event populates the route.
1131+
*
1132+
* <p>When {@code false} (the default), nodes that have no route entry are contacted directly
1133+
* using their broadcast address, preserving backward-compatible mixed proxy/direct topologies.
1134+
*
1135+
* <p>Value type: boolean
1136+
*/
1137+
CLIENT_ROUTES_EXCLUSIVE_PROXY("advanced.client-routes.exclusive-proxy");
11231138

11241139
private final String path;
11251140

core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -954,6 +954,15 @@ public String toString() {
954954
new TypedDriverOption<>(
955955
DefaultDriverOption.CLIENT_ROUTES_SHARD_AWARENESS_ENABLED, GenericType.BOOLEAN);
956956

957+
/**
958+
* Whether the driver connects exclusively through proxies when client routes are configured. When
959+
* {@code false} (default), nodes without a route entry are contacted directly via their broadcast
960+
* address.
961+
*/
962+
public static final TypedDriverOption<Boolean> CLIENT_ROUTES_EXCLUSIVE_PROXY =
963+
new TypedDriverOption<>(
964+
DefaultDriverOption.CLIENT_ROUTES_EXCLUSIVE_PROXY, GenericType.BOOLEAN);
965+
957966
private static Iterable<TypedDriverOption<?>> introspectBuiltInValues() {
958967
try {
959968
ImmutableList.Builder<TypedDriverOption<?>> result = ImmutableList.builder();

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPoint.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,33 @@ public class ClientRoutesEndPoint implements EndPoint {
3636
private final UUID hostId;
3737
private final ClientRoutesTopologyMonitor topologyMonitor;
3838
private final String metricPrefix;
39+
@Nullable private final EndPoint fallbackEndPoint;
40+
private final boolean exclusiveProxy;
3941

4042
/**
4143
* @param topologyMonitor the topology monitor used to resolve the endpoint address on demand.
4244
* @param hostId the host UUID identifying this node in the cluster.
4345
* @param broadcastInetAddress the node's broadcast address (from system.peers or system.local),
4446
* used to build a stable metric prefix. May be {@code null} if the address could not be
4547
* determined, in which case the hostId is used as the metric prefix instead.
48+
* @param fallbackEndPoint the default endpoint to fall back to when {@code
49+
* topologyMonitor.resolve()} returns {@code null}, i.e. when this node is not accessed via a
50+
* cloud private endpoint. Ignored (and may be {@code null}) when {@code exclusiveProxy} is
51+
* {@code true}.
52+
* @param exclusiveProxy when {@code true}, {@link #resolve()} throws instead of falling back to
53+
* {@code fallbackEndPoint} if no route is found.
4654
*/
4755
public ClientRoutesEndPoint(
4856
@NonNull ClientRoutesTopologyMonitor topologyMonitor,
4957
@NonNull UUID hostId,
50-
@Nullable InetAddress broadcastInetAddress) {
58+
@Nullable InetAddress broadcastInetAddress,
59+
@Nullable EndPoint fallbackEndPoint,
60+
boolean exclusiveProxy) {
5161
this.topologyMonitor =
5262
Objects.requireNonNull(topologyMonitor, "Topology monitor cannot be null");
5363
this.hostId = Objects.requireNonNull(hostId, "HOST uuid cannot be null");
64+
this.fallbackEndPoint = fallbackEndPoint;
65+
this.exclusiveProxy = exclusiveProxy;
5466
this.metricPrefix = buildMetricPrefix(broadcastInetAddress, hostId);
5567
}
5668

@@ -70,11 +82,17 @@ public SocketAddress resolve() {
7082
} catch (IOException e) {
7183
throw new UncheckedIOException("DNS resolution failed for host_id=" + hostId, e);
7284
}
73-
// When client routes are configured, the driver must connect exclusively through proxies.
74-
// Falling back to the node's broadcast address would bypass the proxy infrastructure and
75-
// cause silent misbehaviour (e.g. during the window between adding a new node and posting
76-
// its client route entry). The node will remain DOWN and the reconnection loop will retry
77-
// until a CLIENT_ROUTES_CHANGE event populates the route.
85+
if (!exclusiveProxy && fallbackEndPoint != null) {
86+
// Default (backward-compatible) mode: fall back to the node's broadcast address.
87+
// This supports mixed proxy/direct topologies where some nodes are behind the private
88+
// endpoint and others are reached directly.
89+
return fallbackEndPoint.resolve();
90+
}
91+
// Exclusive-proxy mode: the driver must connect only through proxies. Falling back to the
92+
// node's broadcast address would bypass the proxy infrastructure and cause silent
93+
// misbehaviour (e.g. during the window between adding a new node and posting its client
94+
// route entry). The node will remain DOWN and the reconnection loop will retry until a
95+
// CLIENT_ROUTES_CHANGE event populates the route.
7896
LOG.warn(
7997
"No client route entry found for host_id={}. "
8098
+ "The node will remain DOWN until a route is published via CLIENT_ROUTES_CHANGE.",

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesTopologyMonitor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public class ClientRoutesTopologyMonitor extends DefaultTopologyMonitor {
8282
private final String logPrefix;
8383
private final AtomicReference<Map<UUID, ClientRouteRecord>> resolvedRoutesCache;
8484
private final boolean useSSL;
85+
private final boolean exclusiveProxy;
8586
private volatile boolean closed = false;
8687
private final AtomicInteger consecutiveEmptyResults = new AtomicInteger(0);
8788

@@ -147,6 +148,11 @@ public ClientRoutesTopologyMonitor(
147148
this.logPrefix = context.getSessionName();
148149
this.resolvedRoutesCache = new AtomicReference<>(Collections.emptyMap());
149150
this.useSSL = context.getSslEngineFactory().isPresent();
151+
this.exclusiveProxy =
152+
context
153+
.getConfig()
154+
.getDefaultProfile()
155+
.getBoolean(DefaultDriverOption.CLIENT_ROUTES_EXCLUSIVE_PROXY);
150156
}
151157

152158
@Override
@@ -480,7 +486,9 @@ protected EndPoint buildNodeEndPoint(
480486
if (broadcastInetAddress == null) {
481487
broadcastInetAddress = row.getInetAddress("peer");
482488
}
483-
return new ClientRoutesEndPoint(this, hostId, broadcastInetAddress);
489+
EndPoint fallback =
490+
exclusiveProxy ? null : super.buildNodeEndPoint(row, broadcastRpcAddress, localEndPoint);
491+
return new ClientRoutesEndPoint(this, hostId, broadcastInetAddress, fallback, exclusiveProxy);
484492
}
485493

486494
/**

core/src/main/resources/reference.conf

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,6 +1173,19 @@ datastax-java-driver {
11731173
# Default: false
11741174
shard-awarness-enabled = false
11751175

1176+
# When true, the driver connects exclusively through proxies and will never fall back to a
1177+
# node's broadcast address when no route entry exists in system.client_routes. Any node
1178+
# without a route is treated as unreachable; it goes DOWN and the reconnection loop retries
1179+
# until a CLIENT_ROUTES_CHANGE event populates the route.
1180+
#
1181+
# When false (default), nodes that have no route entry are reached directly using their
1182+
# broadcast address, preserving backward-compatible mixed proxy/direct topologies where some
1183+
# nodes are behind the private endpoint and others are not.
1184+
#
1185+
# Required: no
1186+
# Default: false
1187+
exclusive-proxy = false
1188+
11761189
}
11771190

11781191
# Whether to resolve the addresses passed to `basic.contact-points`.

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPointTest.java

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2222
import static org.mockito.Mockito.when;
2323

24+
import com.datastax.oss.driver.api.core.metadata.EndPoint;
2425
import java.io.UncheckedIOException;
2526
import java.net.InetAddress;
2627
import java.net.InetSocketAddress;
@@ -35,6 +36,7 @@
3536
public class ClientRoutesEndPointTest {
3637

3738
@Mock private ClientRoutesTopologyMonitor topologyMonitor;
39+
@Mock private EndPoint fallbackEndPoint;
3840

3941
// ---- resolve() ----------------------------------------------------------
4042

@@ -44,17 +46,18 @@ public void should_resolve_via_topology_monitor() throws UnknownHostException {
4446
InetSocketAddress expected = new InetSocketAddress("127.0.0.1", 9042);
4547
when(topologyMonitor.resolve(hostId)).thenReturn(expected);
4648

47-
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
49+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null, null, false);
4850

4951
assertThat(ep.resolve()).isEqualTo(expected);
5052
}
5153

5254
@Test
53-
public void should_throw_when_resolve_returns_null() throws UnknownHostException {
55+
public void should_throw_when_exclusive_proxy_and_resolve_returns_null()
56+
throws UnknownHostException {
5457
UUID hostId = UUID.randomUUID();
5558
when(topologyMonitor.resolve(hostId)).thenReturn(null);
5659

57-
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
60+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null, null, true);
5861

5962
assertThatThrownBy(ep::resolve)
6063
.isInstanceOf(IllegalStateException.class)
@@ -63,12 +66,26 @@ public void should_throw_when_resolve_returns_null() throws UnknownHostException
6366
.hasMessageContaining("exclusively through proxies");
6467
}
6568

69+
@Test
70+
public void should_fall_back_to_broadcast_when_exclusive_proxy_disabled()
71+
throws UnknownHostException {
72+
UUID hostId = UUID.randomUUID();
73+
InetSocketAddress fallbackAddress = new InetSocketAddress("10.0.0.1", 9042);
74+
when(topologyMonitor.resolve(hostId)).thenReturn(null);
75+
when(fallbackEndPoint.resolve()).thenReturn(fallbackAddress);
76+
77+
ClientRoutesEndPoint ep =
78+
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, false);
79+
80+
assertThat(ep.resolve()).isEqualTo(fallbackAddress);
81+
}
82+
6683
@Test
6784
public void should_wrap_io_exceptions_in_unchecked_io_exception() throws UnknownHostException {
6885
UUID hostId = UUID.randomUUID();
6986
when(topologyMonitor.resolve(hostId)).thenThrow(new UnknownHostException("no-such-host"));
7087

71-
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
88+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null, null, false);
7289

7390
assertThatThrownBy(ep::resolve)
7491
.isInstanceOf(UncheckedIOException.class)
@@ -82,7 +99,7 @@ public void should_reflect_route_changes_on_subsequent_resolve() throws UnknownH
8299
InetSocketAddress addr2 = new InetSocketAddress("10.0.0.1", 9043);
83100
when(topologyMonitor.resolve(hostId)).thenReturn(addr1);
84101

85-
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
102+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null, null, false);
86103

87104
assertThat(ep.resolve()).isEqualTo(addr1);
88105

@@ -97,25 +114,27 @@ public void should_reflect_route_changes_on_subsequent_resolve() throws UnknownH
97114
@Test
98115
public void should_be_equal_when_same_host_id() {
99116
UUID hostId = UUID.randomUUID();
100-
ClientRoutesEndPoint ep1 = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
101-
ClientRoutesEndPoint ep2 = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
117+
ClientRoutesEndPoint ep1 = new ClientRoutesEndPoint(topologyMonitor, hostId, null, null, false);
118+
ClientRoutesEndPoint ep2 = new ClientRoutesEndPoint(topologyMonitor, hostId, null, null, false);
102119

103120
assertThat(ep1).isEqualTo(ep2);
104121
assertThat(ep1.hashCode()).isEqualTo(ep2.hashCode());
105122
}
106123

107124
@Test
108125
public void should_not_be_equal_when_different_host_id() {
109-
ClientRoutesEndPoint ep1 = new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null);
110-
ClientRoutesEndPoint ep2 = new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null);
126+
ClientRoutesEndPoint ep1 =
127+
new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null, null, false);
128+
ClientRoutesEndPoint ep2 =
129+
new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null, null, false);
111130

112131
assertThat(ep1).isNotEqualTo(ep2);
113132
}
114133

115134
@Test
116135
public void should_not_be_equal_to_non_client_routes_endpoint() {
117136
UUID hostId = UUID.randomUUID();
118-
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
137+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null, null, false);
119138

120139
assertThat(ep).isNotEqualTo("not an endpoint");
121140
assertThat(ep).isNotEqualTo(null);
@@ -126,7 +145,7 @@ public void should_not_be_equal_to_non_client_routes_endpoint() {
126145
@Test
127146
public void should_use_host_id_as_metric_prefix_when_address_is_null() {
128147
UUID hostId = UUID.randomUUID();
129-
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
148+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null, null, false);
130149

131150
assertThat(ep.asMetricPrefix()).isEqualTo(hostId.toString());
132151
}
@@ -135,7 +154,7 @@ public void should_use_host_id_as_metric_prefix_when_address_is_null() {
135154
public void should_format_ipv4_metric_prefix() throws Exception {
136155
UUID hostId = UUID.randomUUID();
137156
InetAddress ipv4 = InetAddress.getByAddress(new byte[] {10, 0, 0, 1});
138-
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, ipv4);
157+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, ipv4, null, false);
139158

140159
assertThat(ep.asMetricPrefix()).isEqualTo("10_0_0_1_" + hostId);
141160
}
@@ -145,7 +164,7 @@ public void should_format_ipv6_metric_prefix() throws Exception {
145164
UUID hostId = UUID.randomUUID();
146165
InetAddress ipv6 =
147166
InetAddress.getByAddress(new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1});
148-
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, ipv6);
167+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, ipv6, null, false);
149168

150169
// IPv6 keeps colons (consistent with DefaultEndPoint), dots replaced by underscores
151170
assertThat(ep.asMetricPrefix()).isEqualTo("0:0:0:0:0:0:0:1_" + hostId);
@@ -156,7 +175,7 @@ public void should_format_ipv6_metric_prefix() throws Exception {
156175
@Test
157176
public void should_return_host_id_as_string() {
158177
UUID hostId = UUID.randomUUID();
159-
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
178+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null, null, false);
160179

161180
assertThat(ep.toString()).isEqualTo("ClientRoutesEndPoint(" + hostId + ")");
162181
}

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesTopologyMonitorTest.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ public void setup() {
142142
when(defaultProfile.getDuration(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT))
143143
.thenReturn(Duration.ofSeconds(5));
144144
when(defaultProfile.getBoolean(DefaultDriverOption.RECONNECT_ON_INIT)).thenReturn(false);
145+
when(defaultProfile.getBoolean(DefaultDriverOption.CLIENT_ROUTES_EXCLUSIVE_PROXY))
146+
.thenReturn(false);
145147
when(context.getSslEngineFactory()).thenReturn(Optional.empty());
146148
ClientRoutesConfig config =
147149
ClientRoutesConfig.builder()
@@ -162,6 +164,21 @@ private void initHandler() {
162164
handler.init();
163165
}
164166

167+
/**
168+
* Creates a fresh handler with the given {@code exclusiveProxy} setting, sharing all other
169+
* context stubs from {@link #setup()}.
170+
*/
171+
private TestableClientRoutesTopologyMonitor createHandlerWithExclusiveProxy(
172+
boolean exclusiveProxy) {
173+
when(defaultProfile.getBoolean(DefaultDriverOption.CLIENT_ROUTES_EXCLUSIVE_PROXY))
174+
.thenReturn(exclusiveProxy);
175+
ClientRoutesConfig config =
176+
ClientRoutesConfig.builder()
177+
.addEndpoint(new ClientRouteProxy(connectionId, "host1"))
178+
.build();
179+
return new TestableClientRoutesTopologyMonitor(context, config);
180+
}
181+
165182
// ---- resolve() -------------------------------------------------------
166183

167184
@Test
@@ -1133,16 +1150,16 @@ hostId1, new ClientRouteRecord(hostId1, "127.0.0.1", 9042),
11331150
}
11341151

11351152
@Test
1136-
public void should_throw_when_no_route_for_host_id() {
1137-
// Simulates a node that has no route in the client_routes table (e.g. newly added node
1138-
// before route is published). resolve() must throw instead of falling back to broadcast
1153+
public void should_throw_when_no_route_for_host_id_and_exclusive_proxy_enabled() {
1154+
// With exclusive-proxy=true: resolve() must throw instead of falling back to broadcast
11391155
// address, to prevent the driver from bypassing proxy infrastructure.
1156+
TestableClientRoutesTopologyMonitor exclusiveHandler = createHandlerWithExclusiveProxy(true);
11401157
UUID hostId = UUID.randomUUID();
11411158
AdminRow row = Mockito.mock(AdminRow.class);
11421159
when(row.getUuid("host_id")).thenReturn(hostId);
11431160
EndPoint localEndPoint = Mockito.mock(EndPoint.class);
11441161

1145-
EndPoint endpoint = handler.buildNodeEndPoint(row, null, localEndPoint);
1162+
EndPoint endpoint = exclusiveHandler.buildNodeEndPoint(row, null, localEndPoint);
11461163
assertThat(endpoint).isInstanceOf(ClientRoutesEndPoint.class);
11471164

11481165
// Cache is empty (no route for this host_id) → must throw, not fall back
@@ -1152,6 +1169,26 @@ public void should_throw_when_no_route_for_host_id() {
11521169
.hasMessageContaining(hostId.toString());
11531170
}
11541171

1172+
@Test
1173+
public void should_fall_back_to_broadcast_when_no_route_and_exclusive_proxy_disabled()
1174+
throws Exception {
1175+
// With exclusive-proxy=false (default): resolve() delegates to the fallback endpoint when
1176+
// no route exists, supporting mixed proxy/direct topologies.
1177+
UUID hostId = UUID.randomUUID();
1178+
AdminRow row = Mockito.mock(AdminRow.class);
1179+
when(row.getUuid("host_id")).thenReturn(hostId);
1180+
EndPoint localEndPoint = Mockito.mock(EndPoint.class);
1181+
InetSocketAddress directAddress = new InetSocketAddress("10.0.0.1", 9042);
1182+
when(localEndPoint.resolve()).thenReturn(directAddress);
1183+
1184+
// handler uses exclusiveProxy=false (set in setup())
1185+
EndPoint endpoint = handler.buildNodeEndPoint(row, null, localEndPoint);
1186+
assertThat(endpoint).isInstanceOf(ClientRoutesEndPoint.class);
1187+
1188+
// Cache is empty → falls back to the direct broadcast address
1189+
assertThat(endpoint.resolve()).isEqualTo(directAddress);
1190+
}
1191+
11551192
// ---- savePort() --------------------------------------------------------
11561193

11571194
@Test

0 commit comments

Comments
 (0)