Skip to content

Commit a6d0e48

Browse files
committed
feat: add EndPoint.resolveAll() for multi-address DNS expansion (DRIVER-201)
Addresses the endpoint-API aspect of DRIVER-201. Problem: EndPoint.resolve() returns a single SocketAddress. When a hostname maps to multiple IPs, the driver can only try the first one and fails with AllNodesFailedException if it is unreachable — the remaining IPs are invisible to the connection layer. Solution (per @dkropachev's architectural direction): - Deprecate EndPoint.resolve(). Add EndPoint.resolveAll() with a default implementation that wraps resolve() in a single-element array for backward compatibility with third-party implementations. - DefaultEndPoint.resolveAll(): if the stored InetSocketAddress is unresolved, calls InetAddress.getAllByName() to expand the hostname to all known IPs, returning one InetSocketAddress per IP. Falls back to the single-element unresolved address if DNS fails, so the connect attempt surfaces a descriptive error rather than returning empty. - SniEndPoint.resolveAll(): re-resolves the proxy hostname on each call and returns all A-records sorted by IP, enabling the caller to try each proxy address in sequence. - ClientRoutesEndPoint.resolveAll(): delegates to resolve() (single- address topology-monitor lookup) and wraps in a one-element array. - ChannelFactory.connect(): replaced endPoint.resolve() with endPoint.resolveAll(). Iterates through the returned candidates via tryNextCandidate(); on per-address failure logs and tries the next; only fails the overall resultFuture when all candidates are exhausted. Protocol-version negotiation (downgrade retries) is scoped to the same address via connectToAddress(), which is semantically correct. Tests: - DefaultEndPointTest: 3 new cases — already-resolved passthrough, unresolved hostname expansion, unresolvable hostname fallback. - SniEndPointTest: new class with cases for resolveAll() happy path, unresolvable host exception, and resolve() sanity check. - All 13 existing ChannelFactory tests continue to pass (LocalEndPoint uses the default single-element resolveAll() via the interface default).
1 parent c830c20 commit a6d0e48

13 files changed

Lines changed: 436 additions & 11 deletions

File tree

core/src/main/java/com/datastax/dse/driver/api/core/auth/DseGssApiAuthProviderBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,8 @@ protected static class GssApiAuthenticator extends BaseDseAuthenticator {
291291
private SaslClient saslClient;
292292
private EndPoint endPoint;
293293

294+
@SuppressWarnings("deprecation") // resolve() is deprecated in favour of resolveAll();
295+
// Kerberos authentication needs a single canonical hostname for SASL service name resolution.
294296
protected GssApiAuthenticator(
295297
GssApiOptions options, EndPoint endPoint, String serverAuthenticator) {
296298
super(serverAuthenticator);

core/src/main/java/com/datastax/dse/driver/internal/core/insights/InsightsClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,8 @@ private InsightsStatusData createStatusData() {
288288
.build();
289289
}
290290

291+
@SuppressWarnings("deprecation") // resolve() is deprecated in favour of resolveAll();
292+
// address reporting needs a single canonical address per node.
291293
private Map<String, SessionStateForNode> getConnectedNodes() {
292294
Map<Node, ChannelPool> pools = driverContext.getPoolManager().getPools();
293295
return pools.entrySet().stream()
@@ -302,6 +304,8 @@ private SessionStateForNode constructSessionStateForNode(Map.Entry<Node, Channel
302304
entry.getKey().getOpenConnections(), entry.getValue().getInFlight());
303305
}
304306

307+
@SuppressWarnings("deprecation") // resolve() is deprecated in favour of resolveAll();
308+
// address reporting needs a single canonical address per node.
305309
private InsightsStartupData createStartupData() {
306310
Map<String, String> startupOptions = driverContext.getStartupOptions();
307311
return InsightsStartupData.builder()
@@ -454,6 +458,8 @@ private PoolSizeByHostDistance getPoolSizeByHostDistance() {
454458
0);
455459
}
456460

461+
@SuppressWarnings("deprecation") // resolve() is deprecated in favour of resolveAll();
462+
// address reporting needs a single canonical address for the control connection.
457463
private String getControlConnectionSocketAddress() {
458464
SocketAddress controlConnectionAddress = controlConnection.channel().getEndPoint().resolve();
459465
return AddressFormatter.nullSafeToString(controlConnectionAddress);

core/src/main/java/com/datastax/oss/driver/api/core/metadata/EndPoint.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,58 @@
1818
package com.datastax.oss.driver.api.core.metadata;
1919

2020
import edu.umd.cs.findbugs.annotations.NonNull;
21-
import java.net.InetSocketAddress;
2221
import java.net.SocketAddress;
2322

2423
/**
2524
* Encapsulates the information needed to open connections to a node.
2625
*
2726
* <p>By default, the driver assumes plain TCP connections, and this is just a wrapper around an
28-
* {@link InetSocketAddress}. However, more complex deployment scenarios might use a custom
27+
* {@link java.net.InetSocketAddress}. However, more complex deployment scenarios might use a custom
2928
* implementation that contains additional information; for example, if the nodes are accessed
3029
* through a proxy with SNI routing, an SNI server name is needed in addition to the proxy address.
3130
*/
3231
public interface EndPoint {
3332

3433
/**
35-
* Resolves this instance to a socket address.
34+
* Resolves this instance to a single socket address.
3635
*
3736
* <p>This will be called each time the driver opens a new connection to the node. The returned
3837
* address cannot be null.
38+
*
39+
* @deprecated Use {@link #resolveAll()} instead. When a hostname maps to multiple IPs (e.g. in
40+
* dynamic DNS environments) only one address is returned here, causing the driver to miss
41+
* fallback IPs when the first one is unreachable. {@code resolveAll()} returns the full set.
3942
*/
43+
@Deprecated
4044
@NonNull
4145
SocketAddress resolve();
4246

47+
/**
48+
* Resolves this instance to all known socket addresses.
49+
*
50+
* <p>This is called each time the driver opens a new connection to the node. For endpoints backed
51+
* by a plain IP address the array contains exactly one element. For endpoints whose hostname
52+
* resolves to multiple IPs (e.g. a DNS round-robin entry) all addresses are returned so that the
53+
* driver can try each one in sequence and fall back gracefully when individual IPs are
54+
* unreachable.
55+
*
56+
* <p>The default implementation wraps {@link #resolve()} and returns a single-element array.
57+
* Implementations that can supply multiple addresses should override this method.
58+
*
59+
* <p>The returned array must not be null and must contain at least one element.
60+
*
61+
* <p><b>Timeout note:</b> {@link com.datastax.oss.driver.internal.core.channel.ChannelFactory}
62+
* tries each address in sequence. If a hostname resolves to N addresses and each attempt times
63+
* out, the worst-case connection time before declaring a node unreachable is {@code N ×
64+
* advanced.connection.connect-timeout}. In practice DNS round-robin entries have only a small
65+
* number of records, so this is rarely a concern, but callers should be aware of this when
66+
* configuring connect timeouts.
67+
*/
68+
@NonNull
69+
default SocketAddress[] resolveAll() {
70+
return new SocketAddress[] {resolve()};
71+
}
72+
4373
/**
4474
* Returns an alternate string representation for use in node-level metric names.
4575
*

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java

Lines changed: 116 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -219,14 +219,119 @@ private void connect(
219219
List<ProtocolVersion> attemptedVersions,
220220
CompletableFuture<DriverChannel> resultFuture) {
221221

222-
SocketAddress resolvedAddress;
222+
SocketAddress[] candidates;
223223
try {
224-
resolvedAddress = endPoint.resolve();
224+
candidates = endPoint.resolveAll();
225+
if (candidates == null || candidates.length == 0) {
226+
resultFuture.completeExceptionally(
227+
new IllegalArgumentException(
228+
"EndPoint.resolveAll() must return a non-null, non-empty array: " + endPoint));
229+
return;
230+
}
225231
} catch (Exception e) {
226232
resultFuture.completeExceptionally(e);
227233
return;
228234
}
229235

236+
tryNextCandidate(
237+
endPoint,
238+
shardingInfo,
239+
shardId,
240+
options,
241+
nodeMetricUpdater,
242+
currentVersion,
243+
isNegotiating,
244+
attemptedVersions,
245+
resultFuture,
246+
candidates,
247+
0);
248+
}
249+
250+
/**
251+
* Iterates through the candidate addresses from {@link EndPoint#resolveAll()}. Tries each one in
252+
* sequence; if an address fails for a reason other than protocol-version negotiation exhaustion,
253+
* the next candidate is tried. Only when all candidates are exhausted is the overall {@code
254+
* resultFuture} failed.
255+
*
256+
* <p><b>Timeout note:</b> addresses are tried serially, so the worst-case time before failure is
257+
* {@code N × connect-timeout} where N is the number of candidates. This is an intentional
258+
* tradeoff: failing immediately on the first unreachable IP would prevent fallback to healthy
259+
* ones. In practice DNS entries have only a small number of records.
260+
*/
261+
private void tryNextCandidate(
262+
EndPoint endPoint,
263+
NodeShardingInfo shardingInfo,
264+
Integer shardId,
265+
DriverChannelOptions options,
266+
NodeMetricUpdater nodeMetricUpdater,
267+
ProtocolVersion currentVersion,
268+
boolean isNegotiating,
269+
List<ProtocolVersion> attemptedVersions,
270+
CompletableFuture<DriverChannel> resultFuture,
271+
SocketAddress[] candidates,
272+
int index) {
273+
274+
SocketAddress candidate = candidates[index];
275+
CompletableFuture<DriverChannel> perAddressFuture = new CompletableFuture<>();
276+
connectToAddress(
277+
endPoint,
278+
shardingInfo,
279+
shardId,
280+
options,
281+
nodeMetricUpdater,
282+
currentVersion,
283+
isNegotiating,
284+
attemptedVersions,
285+
perAddressFuture,
286+
candidate);
287+
288+
perAddressFuture.whenComplete(
289+
(channel, error) -> {
290+
if (error == null) {
291+
resultFuture.complete(channel);
292+
} else if (index + 1 < candidates.length) {
293+
LOG.debug(
294+
"[{}] Failed to connect to {} ({}), trying next address",
295+
logPrefix,
296+
candidate,
297+
error.getMessage());
298+
tryNextCandidate(
299+
endPoint,
300+
shardingInfo,
301+
shardId,
302+
options,
303+
nodeMetricUpdater,
304+
currentVersion,
305+
isNegotiating,
306+
attemptedVersions,
307+
resultFuture,
308+
candidates,
309+
index + 1);
310+
} else {
311+
// Note: might be completed already if the failure happened in initializer()
312+
resultFuture.completeExceptionally(error);
313+
}
314+
});
315+
}
316+
317+
/**
318+
* Performs a Netty bootstrap connect to a single, already-resolved address. Handles
319+
* protocol-version negotiation (downgrade retries) internally, staying on the same address. Uses
320+
* {@code perAddressFuture} so {@link #tryNextCandidate} can distinguish a per-address TCP failure
321+
* (try the next IP) from a successful protocol handshake.
322+
*/
323+
private void connectToAddress(
324+
EndPoint endPoint,
325+
NodeShardingInfo shardingInfo,
326+
Integer shardId,
327+
DriverChannelOptions options,
328+
NodeMetricUpdater nodeMetricUpdater,
329+
ProtocolVersion currentVersion,
330+
boolean isNegotiating,
331+
List<ProtocolVersion> attemptedVersions,
332+
CompletableFuture<DriverChannel> perAddressFuture,
333+
SocketAddress resolvedAddress) {
334+
230335
NettyOptions nettyOptions = context.getNettyOptions();
231336

232337
Bootstrap bootstrap =
@@ -235,7 +340,8 @@ private void connect(
235340
.channel(nettyOptions.channelClass())
236341
.option(ChannelOption.ALLOCATOR, nettyOptions.allocator())
237342
.handler(
238-
initializer(endPoint, currentVersion, options, nodeMetricUpdater, resultFuture));
343+
initializer(
344+
endPoint, currentVersion, options, nodeMetricUpdater, perAddressFuture));
239345

240346
nettyOptions.afterBootstrapInitialized(bootstrap);
241347

@@ -294,7 +400,7 @@ private void connect(
294400
ConsistencyLevel.LOCAL_QUORUM.name()));
295401
}
296402
}
297-
resultFuture.complete(driverChannel);
403+
perAddressFuture.complete(driverChannel);
298404
} else {
299405
Throwable error = connectFuture.cause();
300406
if (error instanceof UnsupportedProtocolVersionException && isNegotiating) {
@@ -307,7 +413,8 @@ private void connect(
307413
logPrefix,
308414
currentVersion,
309415
downgraded.get());
310-
connect(
416+
// Stay on the same address for protocol-version downgrade retries.
417+
connectToAddress(
311418
endPoint,
312419
shardingInfo,
313420
shardId,
@@ -316,16 +423,17 @@ private void connect(
316423
downgraded.get(),
317424
true,
318425
attemptedVersions,
319-
resultFuture);
426+
perAddressFuture,
427+
resolvedAddress);
320428
} else {
321-
resultFuture.completeExceptionally(
429+
perAddressFuture.completeExceptionally(
322430
UnsupportedProtocolVersionException.forNegotiation(
323431
endPoint, attemptedVersions));
324432
}
325433
} else {
326434
// Note: might be completed already if the failure happened in initializer(), this is
327435
// fine
328-
resultFuture.completeExceptionally(error);
436+
perAddressFuture.completeExceptionally(error);
329437
}
330438
}
331439
});

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/ClientRoutesEndPoint.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,20 @@ public SocketAddress resolve() {
7676
return fallbackEndPoint.resolve();
7777
}
7878

79+
/**
80+
* Returns all socket addresses for this endpoint.
81+
*
82+
* <p>Delegates to {@link #resolve()} to obtain the single address provided by the topology
83+
* monitor (or the fallback endpoint), then returns it as a one-element array. The topology
84+
* monitor resolves each node to exactly one address by design (via a per-host-id lookup), so
85+
* multi-address expansion is not applicable here.
86+
*/
87+
@NonNull
88+
@Override
89+
public SocketAddress[] resolveAll() {
90+
return new SocketAddress[] {resolve()};
91+
}
92+
7993
@Override
8094
public boolean equals(Object other) {
8195
if (other == this) {

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultEndPoint.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
import com.datastax.oss.driver.api.core.metadata.EndPoint;
2121
import edu.umd.cs.findbugs.annotations.NonNull;
2222
import java.io.Serializable;
23+
import java.net.InetAddress;
2324
import java.net.InetSocketAddress;
25+
import java.net.SocketAddress;
26+
import java.net.UnknownHostException;
2427
import java.util.Objects;
2528

2629
public class DefaultEndPoint implements EndPoint, Serializable {
@@ -41,6 +44,47 @@ public InetSocketAddress resolve() {
4144
return address;
4245
}
4346

47+
/**
48+
* Returns all socket addresses for this endpoint.
49+
*
50+
* <p>If the stored address is unresolved (i.e. the driver was configured with {@code
51+
* RESOLVE_CONTACT_POINTS=false} and the hostname has not been looked up yet), this method calls
52+
* {@link InetAddress#getAllByName(String)} to expand the hostname to every IP it resolves to.
53+
* Each resolved IP is returned as an {@link InetSocketAddress} with the same port as the
54+
* original. If the hostname resolves to only one IP, or if the address is already resolved, a
55+
* single-element array is returned.
56+
*
57+
* <p>If DNS resolution fails, falls back to a single-element array containing {@link #resolve()}.
58+
*
59+
* <p><b>Note on resolver:</b> DNS lookup is performed via {@link
60+
* InetAddress#getAllByName(String)} on the calling thread, bypassing any custom Netty {@code
61+
* AddressResolverGroup} configured via {@link
62+
* com.datastax.oss.driver.api.core.config.DefaultDriverOption#NETTY_ADMIN_SIZE}. This is
63+
* consistent with how {@link SniEndPoint} and {@link
64+
* com.datastax.oss.driver.internal.core.metadata.MetadataManager#getResolvedContactPoints()}
65+
* perform DNS resolution elsewhere in the driver. Users who rely on a custom Netty resolver
66+
* should supply pre-resolved {@link java.net.InetSocketAddress} instances instead of hostnames.
67+
*/
68+
@NonNull
69+
@Override
70+
public SocketAddress[] resolveAll() {
71+
if (!address.isUnresolved()) {
72+
return new SocketAddress[] {address};
73+
}
74+
try {
75+
InetAddress[] all = InetAddress.getAllByName(address.getHostString());
76+
SocketAddress[] result = new SocketAddress[all.length];
77+
for (int i = 0; i < all.length; i++) {
78+
result[i] = new InetSocketAddress(all[i], address.getPort());
79+
}
80+
return result;
81+
} catch (UnknownHostException e) {
82+
// Fallback: return the single unresolved address; the connect attempt will fail with a
83+
// descriptive error rather than silently returning an empty array.
84+
return new SocketAddress[] {address};
85+
}
86+
}
87+
4488
@Override
4589
public boolean equals(Object other) {
4690
if (other == this) {

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,8 @@ private Optional<NodeInfo> findInPeers(
701701
// Current versions of Cassandra (3.11 at the time of writing), require the same port for all
702702
// nodes. As a consequence, the port is not stored in system tables.
703703
// We save it the first time we get a control connection channel.
704+
@SuppressWarnings("deprecation") // resolve() is deprecated in favour of resolveAll(); a single
705+
// canonical address is all that is needed here to extract the port.
704706
protected void savePort(DriverChannel channel) {
705707
if (port < 0) {
706708
SocketAddress address = channel.getEndPoint().resolve();
@@ -723,6 +725,8 @@ protected void savePort(DriverChannel channel) {
723725
* otherwise.
724726
*/
725727
@Nullable
728+
@SuppressWarnings("deprecation") // resolve() is deprecated in favour of resolveAll(); a single
729+
// canonical address is all that is needed here for the peer-vs-local comparison.
726730
protected InetSocketAddress getBroadcastRpcAddress(
727731
@NonNull AdminRow row, @NonNull EndPoint localEndPoint) {
728732

0 commit comments

Comments
 (0)