Skip to content

Commit a57613d

Browse files
committed
fix: Include non-replica nodes in LWT PRESERVE_REPLICA_ORDER query plan
The PreserveReplicaOrderIterator in TokenAwarePolicy previously returned only replica nodes in the query plan. When replicas were unavailable (e.g. prepared statements before parameter binding, or replicas going down after plan construction), the query plan could be empty or insufficient, causing "No node was available" errors. The fix adds a third pass that appends non-replica nodes from the child policy after all replicas have been returned. When no replicas are available at all (all DOWN/IGNORED), the full child policy fallback is preserved. Fixes #833
1 parent 69065f4 commit a57613d

2 files changed

Lines changed: 77 additions & 41 deletions

File tree

driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@
3838
import java.util.ArrayList;
3939
import java.util.Collection;
4040
import java.util.Collections;
41+
import java.util.HashSet;
4142
import java.util.Iterator;
4243
import java.util.List;
44+
import java.util.Set;
4345
import java.util.concurrent.ThreadLocalRandom;
4446

4547
/**
@@ -70,20 +72,18 @@
7072
* <h3>Lightweight Transaction (LWT) Routing</h3>
7173
*
7274
* <p>For {@linkplain Statement#isLWT() lightweight transaction} queries, this policy provides
73-
* specialized replica-only routing to optimize LWT performance and avoid contention. When LWT
74-
* routing is enabled (the default), the query plan contains <strong>only replicas</strong> for the
75-
* target partition, ordered by datacenter locality:
75+
* specialized routing to optimize LWT performance and avoid contention. When LWT routing is enabled
76+
* (the default), the query plan prioritizes replicas for the target partition, ordered by
77+
* datacenter locality, followed by non-replica nodes for failover:
7678
*
7779
* <ul>
7880
* <li>Local replicas first: replicas for which the child policy reports {@link HostDistance#LOCAL
7981
* LOCAL} distance are returned first, in the order provided by cluster metadata (preserving
8082
* primary replica ordering from the token ring).
8183
* <li>Remote replicas second: remaining replicas (typically in remote datacenters) are appended,
8284
* but only if they are up and not ignored by the child policy.
83-
* <li>Replica-only routing when possible: under normal conditions, LWT query plans target only
84-
* replicas for the partition in order to reduce coordinator forwarding overhead and improve
85-
* performance. When replica information is unavailable, the driver falls back to the child
86-
* policy as described in the fallback behavior below, which may include non-replica hosts.
85+
* <li>Non-replica nodes: remaining nodes from the child policy's query plan are appended after
86+
* all replicas, ensuring the query plan always includes all available nodes for failover.
8787
* </ul>
8888
*
8989
* <p><strong>Rack awareness</strong> is intentionally <em>not</em> applied to LWT replica ordering.
@@ -243,36 +243,38 @@ protected Host computeNext() {
243243

244244
/**
245245
* An iterator that returns replicas first, with local replicas prioritized (preserving primary
246-
* replica order), then remote replicas. Used for LWT queries to ensure replica-only routing and
247-
* minimize coordinator forwarding overhead. DOWN and IGNORED hosts are filtered out.
246+
* replica order), then remote replicas, then non-replica nodes from the child policy. DOWN and
247+
* IGNORED hosts are filtered out from replicas.
248248
*
249-
* <p>Query plan follows a three-pass strategy:
249+
* <p>Query plan follows a four-pass strategy:
250250
*
251251
* <ol>
252252
* <li><strong>Local replicas:</strong> Returns UP replicas marked as LOCAL by the child policy,
253253
* in the order provided by cluster metadata (preserving primary replica order).
254254
* <li><strong>Remote replicas:</strong> Returns UP replicas marked as REMOTE by the child
255255
* policy.
256-
* <li><strong>Child policy fallback:</strong> If no suitable replicas are available (for
257-
* example, all are DOWN or IGNORED and thus none are returned), falls back to the child
258-
* policy's query plan for the remaining hosts. The child policy's plan is used as-is and
259-
* may include hosts that were already considered by this iterator.
256+
* <li><strong>Non-replica nodes:</strong> Returns remaining nodes from the child policy's query
257+
* plan, skipping any hosts already returned as replicas. This ensures all available nodes
258+
* are included in the query plan for failover.
259+
* <li><strong>Child policy fallback:</strong> If no suitable replicas were returned at all (for
260+
* example, all are DOWN or IGNORED), falls back to the child policy's full query plan.
260261
* </ol>
261262
*/
262263
private class PreserveReplicaOrderIterator extends AbstractIterator<Host> {
263264
private final Iterator<Host> replicasIterator;
265+
private final List<Host> replicas;
264266
private final String keyspace;
265267
private final Statement statement;
266268
private List<Host> nonLocalReplicas;
267269
private Iterator<Host> nonLocalReplicasIterator;
268-
private boolean hasReturnedReplicas;
270+
private Set<Host> returnedHosts;
269271
private Iterator<Host> childIterator;
270272

271-
public PreserveReplicaOrderIterator(
272-
String keyspace, Statement statement, Iterator<Host> replicasIterator) {
273+
public PreserveReplicaOrderIterator(String keyspace, Statement statement, List<Host> replicas) {
273274
this.keyspace = keyspace;
274275
this.statement = statement;
275-
this.replicasIterator = replicasIterator;
276+
this.replicas = replicas;
277+
this.replicasIterator = replicas.iterator();
276278
}
277279

278280
@Override
@@ -289,7 +291,8 @@ protected Host computeNext() {
289291

290292
switch (distance) {
291293
case LOCAL:
292-
hasReturnedReplicas = true;
294+
if (returnedHosts == null) returnedHosts = new HashSet<>();
295+
returnedHosts.add(host);
293296
return host;
294297
case REMOTE:
295298
// Collect remote replicas for second pass
@@ -307,21 +310,31 @@ protected Host computeNext() {
307310
if (nonLocalReplicasIterator == null) {
308311
nonLocalReplicasIterator = nonLocalReplicas.iterator();
309312
}
310-
if (nonLocalReplicasIterator.hasNext()) {
311-
hasReturnedReplicas = true;
312-
return nonLocalReplicasIterator.next();
313+
while (nonLocalReplicasIterator.hasNext()) {
314+
Host host = nonLocalReplicasIterator.next();
315+
if (returnedHosts == null) returnedHosts = new HashSet<>();
316+
returnedHosts.add(host);
317+
return host;
313318
}
314319
}
315320

316-
// Third pass: fallback to child policy if no suitable replicas were returned
317-
// This handles cases where all replicas are empty, DOWN or IGNORED
318-
if (!hasReturnedReplicas) {
319-
if (childIterator == null) {
320-
childIterator = childPolicy.newQueryPlan(keyspace, statement);
321+
// Third pass: return remaining nodes from child policy
322+
if (childIterator == null) {
323+
childIterator = childPolicy.newQueryPlan(keyspace, statement);
324+
}
325+
while (childIterator.hasNext()) {
326+
Host host = childIterator.next();
327+
// Skip hosts we already returned as replicas
328+
if (returnedHosts != null && returnedHosts.contains(host)) {
329+
continue;
321330
}
322-
if (childIterator.hasNext()) {
323-
return childIterator.next();
331+
// If we returned some replicas, skip remaining replicas from child policy
332+
// to avoid duplicates. If no replicas were returned (all DOWN/IGNORED),
333+
// allow full child policy fallback including replica hosts.
334+
if (returnedHosts != null && replicas.contains(host)) {
335+
continue;
324336
}
337+
return host;
325338
}
326339

327340
return endOfData();
@@ -477,7 +490,10 @@ private Iterator<Host> newQueryPlanRegular(
477490

478491
private Iterator<Host> newQueryPlanPreserveReplicaOrder(
479492
String keyspace, Statement statement, List<Host> replicas) {
480-
return new PreserveReplicaOrderIterator(keyspace, statement, replicas.iterator());
493+
if (replicas.isEmpty()) {
494+
return childPolicy.newQueryPlan(keyspace, statement);
495+
}
496+
return new PreserveReplicaOrderIterator(keyspace, statement, replicas);
481497
}
482498

483499
@Override

driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -163,15 +163,17 @@ public void should_prioritize_local_replicas_for_lwt(TokenAwarePolicy.ReplicaOrd
163163
when(lwtStatement.getKeyspace()).thenReturn(KEYSPACE);
164164
when(childPolicy.distance(host1)).thenReturn(HostDistance.REMOTE);
165165
when(childPolicy.distance(host2)).thenReturn(HostDistance.LOCAL);
166+
when(childPolicy.newQueryPlan(KEYSPACE, lwtStatement))
167+
.thenReturn(Lists.newArrayList(host4, host3, host2, host1).iterator());
166168

167169
TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, ordering);
168170
policy.init(cluster, null);
169171

170172
// when
171173
Iterator<Host> queryPlan = policy.newQueryPlan(KEYSPACE, lwtStatement);
172174

173-
// then: local replica first, then remaining replicas only
174-
assertThat(queryPlan).containsExactly(host2, host1);
175+
// then: local replica first, remote replica, then non-replicas from child policy
176+
assertThat(queryPlan).containsExactly(host2, host1, host4, host3);
175177
}
176178

177179
@Test(groups = "unit", dataProvider = "shuffleProvider")
@@ -184,15 +186,17 @@ public void should_preserve_replica_order_for_lwt(TokenAwarePolicy.ReplicaOrderi
184186
when(lwtStatement.getKeyspace()).thenReturn(KEYSPACE);
185187
when(metadata.getReplicasList(Metadata.quote(KEYSPACE), null, null, routingKey))
186188
.thenReturn(Lists.newArrayList(host2, host3, host1));
189+
when(childPolicy.newQueryPlan(KEYSPACE, lwtStatement))
190+
.thenReturn(Lists.newArrayList(host4, host3, host2, host1).iterator());
187191

188192
TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, ordering);
189193
policy.init(cluster, null);
190194

191195
// when
192196
Iterator<Host> queryPlan = policy.newQueryPlan(KEYSPACE, lwtStatement);
193197

194-
// then: replica order preserved and only replicas returned
195-
assertThat(queryPlan).containsExactly(host2, host3, host1);
198+
// then: replica order preserved, then non-replicas from child policy
199+
assertThat(queryPlan).containsExactly(host2, host3, host1, host4);
196200
}
197201

198202
@Test(groups = "unit")
@@ -241,14 +245,18 @@ public void should_filter_down_replicas_for_lwt(TokenAwarePolicy.ReplicaOrdering
241245
when(childPolicy.distance(host3)).thenReturn(HostDistance.REMOTE);
242246
when(host3.isUp()).thenReturn(false);
243247

248+
// host4 is a non-replica available via child policy
249+
when(childPolicy.newQueryPlan(KEYSPACE, lwtStatement))
250+
.thenReturn(Lists.newArrayList(host4).iterator());
251+
244252
TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, ordering);
245253
policy.init(cluster, null);
246254

247255
// when
248256
Iterator<Host> queryPlan = policy.newQueryPlan(KEYSPACE, lwtStatement);
249257

250-
// then: only UP replicas are returned (host1 and host3 are DOWN so excluded)
251-
assertThat(queryPlan).containsExactly(host2);
258+
// then: UP replicas first, then non-replicas from child policy
259+
assertThat(queryPlan).containsExactly(host2, host4);
252260
}
253261

254262
@Test(groups = "unit", dataProvider = "shuffleProvider")
@@ -274,20 +282,24 @@ public void should_filter_ignored_replicas_for_lwt(TokenAwarePolicy.ReplicaOrder
274282
when(childPolicy.distance(host3)).thenReturn(HostDistance.REMOTE);
275283
when(host3.isUp()).thenReturn(true);
276284

285+
// host4 is a non-replica available via child policy
286+
when(childPolicy.newQueryPlan(KEYSPACE, lwtStatement))
287+
.thenReturn(Lists.newArrayList(host4).iterator());
288+
277289
TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, ordering);
278290
policy.init(cluster, null);
279291

280292
// when
281293
Iterator<Host> queryPlan = policy.newQueryPlan(KEYSPACE, lwtStatement);
282294

283-
// then: IGNORED replicas are excluded (host2), local first then remote
284-
assertThat(queryPlan).containsExactly(host1, host3);
295+
// then: IGNORED replicas excluded, local first, remote, then non-replicas
296+
assertThat(queryPlan).containsExactly(host1, host3, host4);
285297
}
286298

287299
@Test(groups = "unit", dataProvider = "shuffleProvider")
288300
public void should_filter_down_and_ignored_replicas_for_lwt(
289301
TokenAwarePolicy.ReplicaOrdering ordering) {
290-
// given: LWT statement with mixed replica states
302+
// given: LWT statement with mixed replica states (all 4 hosts are replicas)
291303
Statement lwtStatement = mock(Statement.class);
292304
when(lwtStatement.isLWT()).thenReturn(true);
293305
when(lwtStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class)))
@@ -312,6 +324,10 @@ public void should_filter_down_and_ignored_replicas_for_lwt(
312324
when(childPolicy.distance(host4)).thenReturn(HostDistance.REMOTE);
313325
when(host4.isUp()).thenReturn(true);
314326

327+
// child policy returns empty since all hosts are replicas
328+
when(childPolicy.newQueryPlan(KEYSPACE, lwtStatement))
329+
.thenReturn(Lists.<Host>newArrayList().iterator());
330+
315331
TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, ordering);
316332
policy.init(cluster, null);
317333

@@ -390,14 +406,18 @@ public void should_return_all_local_replicas_when_all_replicas_are_local(
390406
when(childPolicy.distance(host3)).thenReturn(HostDistance.LOCAL);
391407
when(host3.isUp()).thenReturn(true);
392408

409+
// host4 is a non-replica available via child policy
410+
when(childPolicy.newQueryPlan(KEYSPACE, lwtStatement))
411+
.thenReturn(Lists.newArrayList(host4).iterator());
412+
393413
TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, ordering);
394414
policy.init(cluster, null);
395415

396416
// when
397417
Iterator<Host> queryPlan = policy.newQueryPlan(KEYSPACE, lwtStatement);
398418

399-
// then: should return all local replicas without NPE (nonLocalReplicas remains null)
400-
assertThat(queryPlan).containsExactly(host1, host2, host3);
419+
// then: all local replicas first, then non-replicas
420+
assertThat(queryPlan).containsExactly(host1, host2, host3, host4);
401421
}
402422

403423
@Test(groups = "unit", dataProvider = "shuffleProvider")

0 commit comments

Comments
 (0)