Skip to content

Commit b870f1d

Browse files
committed
client routes: disable fallback to broadcast address when route is missing
When client routes are configured, the driver must connect exclusively through proxies (addresses from system.client_routes). The previous behavior silently fell back to the node's direct broadcast address when no route entry existed for a host_id, which caused races and silent misbehaviour upon adding new nodes (time window between node addition and client route entry publication). Now ClientRoutesEndPoint.resolve() throws IllegalStateException when no route is found, causing the node to go DOWN and enter the standard reconnection loop until a CLIENT_ROUTES_CHANGE event populates the route. This matches the Rust driver's approach of failing address translation on missing entries. Changes: - ClientRoutesEndPoint: remove fallbackEndPoint field; resolve() throws with WARN log instead of falling back to broadcast address - ClientRoutesTopologyMonitor.buildNodeEndPoint(): remove fallback computation; throw on null host_id instead of delegating to super - Tests updated to verify exception-throwing behavior
1 parent fc725a0 commit b870f1d

4 files changed

Lines changed: 60 additions & 66 deletions

File tree

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPoint.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,33 +27,30 @@
2727
import java.net.SocketAddress;
2828
import java.util.Objects;
2929
import java.util.UUID;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
3032

3133
public class ClientRoutesEndPoint implements EndPoint {
34+
private static final Logger LOG = LoggerFactory.getLogger(ClientRoutesEndPoint.class);
35+
3236
private final UUID hostId;
3337
private final ClientRoutesTopologyMonitor topologyMonitor;
3438
private final String metricPrefix;
35-
@NonNull private final EndPoint fallbackEndPoint;
3639

3740
/**
3841
* @param topologyMonitor the topology monitor used to resolve the endpoint address on demand.
3942
* @param hostId the host UUID identifying this node in the cluster.
4043
* @param broadcastInetAddress the node's broadcast address (from system.peers or system.local),
4144
* used to build a stable metric prefix. May be {@code null} if the address could not be
4245
* determined, in which case the hostId is used as the metric prefix instead.
43-
* @param fallbackEndPoint the default endpoint to fall back to when {@code
44-
* topologyMonitor.resolve()} returns {@code null}, i.e. when this node is not accessed via a
45-
* cloud private endpoint. Must not be {@code null}.
4646
*/
4747
public ClientRoutesEndPoint(
4848
@NonNull ClientRoutesTopologyMonitor topologyMonitor,
4949
@NonNull UUID hostId,
50-
@Nullable InetAddress broadcastInetAddress,
51-
@NonNull EndPoint fallbackEndPoint) {
50+
@Nullable InetAddress broadcastInetAddress) {
5251
this.topologyMonitor =
5352
Objects.requireNonNull(topologyMonitor, "Topology monitor cannot be null");
5453
this.hostId = Objects.requireNonNull(hostId, "HOST uuid cannot be null");
55-
this.fallbackEndPoint =
56-
Objects.requireNonNull(fallbackEndPoint, "Fallback endpoint cannot be null");
5754
this.metricPrefix = buildMetricPrefix(broadcastInetAddress, hostId);
5855
}
5956

@@ -73,7 +70,20 @@ public SocketAddress resolve() {
7370
} catch (IOException e) {
7471
throw new UncheckedIOException("DNS resolution failed for host_id=" + hostId, e);
7572
}
76-
return fallbackEndPoint.resolve();
73+
// When client routes are configured, the driver must connect exclusively through proxies.
74+
// Falling back to the node's broadcast address would bypass the proxy infrastructure and
75+
// cause silent misbehaviour (e.g. during the window between adding a new node and posting
76+
// its client route entry). The node will remain DOWN and the reconnection loop will retry
77+
// until a CLIENT_ROUTES_CHANGE event populates the route.
78+
LOG.warn(
79+
"No client route entry found for host_id={}. "
80+
+ "The node will remain DOWN until a route is published via CLIENT_ROUTES_CHANGE.",
81+
hostId);
82+
throw new IllegalStateException(
83+
"No client route entry found for host_id="
84+
+ hostId
85+
+ ". Will not connect to the node's broadcast address because client routes "
86+
+ "are configured; the driver must connect exclusively through proxies.");
7787
}
7888

7989
@Override

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesTopologyMonitor.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -461,12 +461,15 @@ protected EndPoint buildNodeEndPoint(
461461
LOG.warn(
462462
"[{}] host_id is null in system row for address {} — cannot assign a client route. "
463463
+ "This may indicate corrupted system tables. "
464-
+ "Falling back to default endpoint resolution.",
464+
+ "The node will be ignored because client routes are configured "
465+
+ "and the driver must connect exclusively through proxies.",
465466
logPrefix,
466467
broadcastRpcAddress);
467-
return super.buildNodeEndPoint(row, broadcastRpcAddress, localEndPoint);
468+
throw new IllegalStateException(
469+
"host_id is null in system row for address "
470+
+ broadcastRpcAddress
471+
+ "; cannot build a ClientRoutesEndPoint without a host_id");
468472
}
469-
EndPoint fallback = super.buildNodeEndPoint(row, broadcastRpcAddress, localEndPoint);
470473
InetAddress broadcastInetAddress = null;
471474
if (broadcastRpcAddress != null) {
472475
broadcastInetAddress = broadcastRpcAddress.getAddress();
@@ -477,7 +480,7 @@ protected EndPoint buildNodeEndPoint(
477480
if (broadcastInetAddress == null) {
478481
broadcastInetAddress = row.getInetAddress("peer");
479482
}
480-
return new ClientRoutesEndPoint(this, hostId, broadcastInetAddress, fallback);
483+
return new ClientRoutesEndPoint(this, hostId, broadcastInetAddress);
481484
}
482485

483486
/**

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPointTest.java

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2222
import static org.mockito.Mockito.when;
2323

24-
import com.datastax.oss.driver.api.core.metadata.EndPoint;
2524
import java.io.UncheckedIOException;
2625
import java.net.InetAddress;
2726
import java.net.InetSocketAddress;
@@ -36,7 +35,6 @@
3635
public class ClientRoutesEndPointTest {
3736

3837
@Mock private ClientRoutesTopologyMonitor topologyMonitor;
39-
@Mock private EndPoint fallbackEndPoint;
4038

4139
// ---- resolve() ----------------------------------------------------------
4240

@@ -46,32 +44,31 @@ public void should_resolve_via_topology_monitor() throws UnknownHostException {
4644
InetSocketAddress expected = new InetSocketAddress("127.0.0.1", 9042);
4745
when(topologyMonitor.resolve(hostId)).thenReturn(expected);
4846

49-
ClientRoutesEndPoint ep =
50-
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
47+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
5148

5249
assertThat(ep.resolve()).isEqualTo(expected);
5350
}
5451

5552
@Test
56-
public void should_fallback_when_resolve_returns_null() throws UnknownHostException {
53+
public void should_throw_when_resolve_returns_null() throws UnknownHostException {
5754
UUID hostId = UUID.randomUUID();
58-
InetSocketAddress fallbackAddr = new InetSocketAddress("10.0.0.1", 9042);
5955
when(topologyMonitor.resolve(hostId)).thenReturn(null);
60-
when(fallbackEndPoint.resolve()).thenReturn(fallbackAddr);
6156

62-
ClientRoutesEndPoint ep =
63-
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
57+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
6458

65-
assertThat(ep.resolve()).isEqualTo(fallbackAddr);
59+
assertThatThrownBy(ep::resolve)
60+
.isInstanceOf(IllegalStateException.class)
61+
.hasMessageContaining("No client route entry found")
62+
.hasMessageContaining(hostId.toString())
63+
.hasMessageContaining("exclusively through proxies");
6664
}
6765

6866
@Test
6967
public void should_wrap_io_exceptions_in_unchecked_io_exception() throws UnknownHostException {
7068
UUID hostId = UUID.randomUUID();
7169
when(topologyMonitor.resolve(hostId)).thenThrow(new UnknownHostException("no-such-host"));
7270

73-
ClientRoutesEndPoint ep =
74-
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
71+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
7572

7673
assertThatThrownBy(ep::resolve)
7774
.isInstanceOf(UncheckedIOException.class)
@@ -85,8 +82,7 @@ public void should_reflect_route_changes_on_subsequent_resolve() throws UnknownH
8582
InetSocketAddress addr2 = new InetSocketAddress("10.0.0.1", 9043);
8683
when(topologyMonitor.resolve(hostId)).thenReturn(addr1);
8784

88-
ClientRoutesEndPoint ep =
89-
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
85+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
9086

9187
assertThat(ep.resolve()).isEqualTo(addr1);
9288

@@ -101,30 +97,25 @@ public void should_reflect_route_changes_on_subsequent_resolve() throws UnknownH
10197
@Test
10298
public void should_be_equal_when_same_host_id() {
10399
UUID hostId = UUID.randomUUID();
104-
ClientRoutesEndPoint ep1 =
105-
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
106-
ClientRoutesEndPoint ep2 =
107-
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
100+
ClientRoutesEndPoint ep1 = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
101+
ClientRoutesEndPoint ep2 = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
108102

109103
assertThat(ep1).isEqualTo(ep2);
110104
assertThat(ep1.hashCode()).isEqualTo(ep2.hashCode());
111105
}
112106

113107
@Test
114108
public void should_not_be_equal_when_different_host_id() {
115-
ClientRoutesEndPoint ep1 =
116-
new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null, fallbackEndPoint);
117-
ClientRoutesEndPoint ep2 =
118-
new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null, fallbackEndPoint);
109+
ClientRoutesEndPoint ep1 = new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null);
110+
ClientRoutesEndPoint ep2 = new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null);
119111

120112
assertThat(ep1).isNotEqualTo(ep2);
121113
}
122114

123115
@Test
124116
public void should_not_be_equal_to_non_client_routes_endpoint() {
125117
UUID hostId = UUID.randomUUID();
126-
ClientRoutesEndPoint ep =
127-
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
118+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
128119

129120
assertThat(ep).isNotEqualTo("not an endpoint");
130121
assertThat(ep).isNotEqualTo(null);
@@ -135,8 +126,7 @@ public void should_not_be_equal_to_non_client_routes_endpoint() {
135126
@Test
136127
public void should_use_host_id_as_metric_prefix_when_address_is_null() {
137128
UUID hostId = UUID.randomUUID();
138-
ClientRoutesEndPoint ep =
139-
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
129+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
140130

141131
assertThat(ep.asMetricPrefix()).isEqualTo(hostId.toString());
142132
}
@@ -145,8 +135,7 @@ public void should_use_host_id_as_metric_prefix_when_address_is_null() {
145135
public void should_format_ipv4_metric_prefix() throws Exception {
146136
UUID hostId = UUID.randomUUID();
147137
InetAddress ipv4 = InetAddress.getByAddress(new byte[] {10, 0, 0, 1});
148-
ClientRoutesEndPoint ep =
149-
new ClientRoutesEndPoint(topologyMonitor, hostId, ipv4, fallbackEndPoint);
138+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, ipv4);
150139

151140
assertThat(ep.asMetricPrefix()).isEqualTo("10_0_0_1_" + hostId);
152141
}
@@ -156,8 +145,7 @@ public void should_format_ipv6_metric_prefix() throws Exception {
156145
UUID hostId = UUID.randomUUID();
157146
InetAddress ipv6 =
158147
InetAddress.getByAddress(new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1});
159-
ClientRoutesEndPoint ep =
160-
new ClientRoutesEndPoint(topologyMonitor, hostId, ipv6, fallbackEndPoint);
148+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, ipv6);
161149

162150
// IPv6 keeps colons (consistent with DefaultEndPoint), dots replaced by underscores
163151
assertThat(ep.asMetricPrefix()).isEqualTo("0:0:0:0:0:0:0:1_" + hostId);
@@ -168,8 +156,7 @@ public void should_format_ipv6_metric_prefix() throws Exception {
168156
@Test
169157
public void should_return_host_id_as_string() {
170158
UUID hostId = UUID.randomUUID();
171-
ClientRoutesEndPoint ep =
172-
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
159+
ClientRoutesEndPoint ep = new ClientRoutesEndPoint(topologyMonitor, hostId, null);
173160

174161
assertThat(ep.toString()).isEqualTo("ClientRoutesEndPoint(" + hostId + ")");
175162
}

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesTopologyMonitorTest.java

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
3939
import edu.umd.cs.findbugs.annotations.NonNull;
4040
import java.net.InetSocketAddress;
41-
import java.net.SocketAddress;
4241
import java.net.UnknownHostException;
4342
import java.time.Duration;
4443
import java.util.ArrayList;
@@ -1041,19 +1040,16 @@ public void should_not_propagate_exception_when_query_fails() throws Exception {
10411040
// ---- buildNodeEndPoint fallback -----------------------------------------
10421041

10431042
@Test
1044-
public void should_build_default_endpoint_when_host_id_is_null() {
1045-
// row.getUuid("host_id") returns null, triggering the hostId == null
1046-
// branch in buildNodeEndPoint which delegates to super.buildNodeEndPoint().
1043+
public void should_throw_when_host_id_is_null() {
1044+
// row.getUuid("host_id") returns null — with client routes configured, the driver
1045+
// must not fall back to direct broadcast address, so buildNodeEndPoint throws.
10471046
AdminRow row = Mockito.mock(AdminRow.class);
10481047
when(row.getUuid("host_id")).thenReturn(null);
1049-
when(row.contains("peer")).thenReturn(false); // local-node row → super returns localEndPoint
10501048
EndPoint localEndPoint = Mockito.mock(EndPoint.class);
10511049

1052-
EndPoint result = handler.buildNodeEndPoint(row, null, localEndPoint);
1053-
1054-
// hostId == null branch → super.buildNodeEndPoint() is called → returns localEndPoint
1055-
assertThat(result).isNotInstanceOf(ClientRoutesEndPoint.class);
1056-
assertThat(result).isSameAs(localEndPoint);
1050+
assertThatThrownBy(() -> handler.buildNodeEndPoint(row, null, localEndPoint))
1051+
.isInstanceOf(IllegalStateException.class)
1052+
.hasMessageContaining("host_id is null");
10571053
}
10581054

10591055
@Test
@@ -1065,7 +1061,6 @@ public void should_build_client_routes_endpoint_when_host_id_non_null() {
10651061
UUID hostId = UUID.randomUUID();
10661062
AdminRow row = Mockito.mock(AdminRow.class);
10671063
when(row.getUuid("host_id")).thenReturn(hostId);
1068-
when(row.contains("peer")).thenReturn(false);
10691064
EndPoint localEndPoint = Mockito.mock(EndPoint.class);
10701065

10711066
EndPoint result = handler.buildNodeEndPoint(row, null, localEndPoint);
@@ -1138,24 +1133,23 @@ hostId1, new ClientRouteRecord(hostId1, "127.0.0.1", 9042),
11381133
}
11391134

11401135
@Test
1141-
public void should_resolve_to_fallback_when_no_route_for_host_id() {
1142-
// Simulates a node that is not accessed via PrivateLink (no route in cache for its host_id).
1143-
// resolve() must return the regular endpoint address (the fallback), not throw.
1136+
public void should_throw_when_no_route_for_host_id() {
1137+
// Simulates a node that has no route in the client_routes table (e.g. newly added node
1138+
// before route is published). resolve() must throw instead of falling back to broadcast
1139+
// address, to prevent the driver from bypassing proxy infrastructure.
11441140
UUID hostId = UUID.randomUUID();
1145-
InetSocketAddress fallbackAddress = new InetSocketAddress("127.0.0.99", 9999);
11461141
AdminRow row = Mockito.mock(AdminRow.class);
11471142
when(row.getUuid("host_id")).thenReturn(hostId);
1148-
when(row.contains("peer")).thenReturn(false);
11491143
EndPoint localEndPoint = Mockito.mock(EndPoint.class);
1150-
when(localEndPoint.resolve()).thenReturn(fallbackAddress);
11511144

11521145
EndPoint endpoint = handler.buildNodeEndPoint(row, null, localEndPoint);
11531146
assertThat(endpoint).isInstanceOf(ClientRoutesEndPoint.class);
11541147

1155-
// Cache is empty (no PrivateLink route) → resolves to the regular endpoint address
1156-
SocketAddress resolved = ((ClientRoutesEndPoint) endpoint).resolve();
1157-
assertThat(resolved).isEqualTo(fallbackAddress);
1158-
Mockito.verify(localEndPoint).resolve();
1148+
// Cache is empty (no route for this host_id) → must throw, not fall back
1149+
assertThatThrownBy(endpoint::resolve)
1150+
.isInstanceOf(IllegalStateException.class)
1151+
.hasMessageContaining("No client route entry found")
1152+
.hasMessageContaining(hostId.toString());
11591153
}
11601154

11611155
// ---- savePort() --------------------------------------------------------

0 commit comments

Comments
 (0)