Skip to content

Commit 76329b9

Browse files
committed
Fix LWT prepared statement routing failure in PRESERVE_REPLICA_ORDER mode
Addresses issue #830 where LWT prepared statements fail with "No node was available" error when using PRESERVE_REPLICA_ORDER routing (default in 4.19.0.5). Changes: - Enhanced newQueryPlanPreserveReplicas to include non-replica nodes with proper DC prioritization - Added fallback logic to prevent empty query plans for prepared statements - Implemented routing-key-based consistent rotation for non-replica nodes - Replaced diceRoll1d4() with universal randomNextInt(bound) method - Updated tests to validate new behavior including randomness testing The fix maintains replica order preservation while ensuring query plans always contain nodes, resolving the regression introduced in 4.19.0.5.
1 parent 93a3deb commit 76329b9

4 files changed

Lines changed: 276 additions & 42 deletions

File tree

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java

Lines changed: 90 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
import com.datastax.oss.driver.shaded.guava.common.collect.MapMaker;
3939
import edu.umd.cs.findbugs.annotations.NonNull;
4040
import edu.umd.cs.findbugs.annotations.Nullable;
41+
import java.util.ArrayList;
4142
import java.util.BitSet;
43+
import java.util.Collections;
4244
import java.util.List;
4345
import java.util.Map;
4446
import java.util.Objects;
@@ -50,6 +52,7 @@
5052
import java.util.concurrent.ConcurrentMap;
5153
import java.util.concurrent.ThreadLocalRandom;
5254
import java.util.concurrent.atomic.AtomicLongArray;
55+
import java.util.stream.Collectors;
5356
import net.jcip.annotations.ThreadSafe;
5457
import org.slf4j.Logger;
5558
import org.slf4j.LoggerFactory;
@@ -176,19 +179,98 @@ public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session ses
176179
}
177180

178181
/**
179-
* Builds a query plan that preserves the replica order as returned by the load balancing
180-
* strategy, while pushing non-local replicas after local ones.
182+
* Builds a query plan that preserves replica order: local replicas, remote replicas, local
183+
* non-replicas (rotated), remote non-replicas (rotated).
181184
*/
182185
@NonNull
183186
public Queue<Node> newQueryPlanPreserveReplicas(
184187
@Nullable Request request, @Nullable Session session) {
185188
List<Node> replicas = getReplicas(request, session);
186189
String localDc = getLocalDatacenter();
187-
if (localDc == null || replicas.isEmpty()) {
188-
return new SimpleQueryPlan(replicas.toArray());
190+
List<Node> queryPlan = new ArrayList<>();
191+
192+
// Collect all nodes by DC
193+
Map<String, List<Node>> nodesByDc = getAllNodesByDc();
194+
List<Node> allNodes =
195+
nodesByDc.values().stream().flatMap(List::stream).collect(Collectors.toList());
196+
197+
if (localDc == null) {
198+
// No local DC: all replicas first, then rotated non-replicas
199+
queryPlan.addAll(replicas);
200+
addRotatedNonReplicas(queryPlan, allNodes, replicas, request);
201+
} else {
202+
// With local DC: prioritize local, then remote
203+
addReplicasByDc(queryPlan, replicas, localDc);
204+
addNonReplicasByDc(queryPlan, nodesByDc, replicas, localDc, request);
189205
}
190206

191-
return new SimpleQueryPlan(moveNonLocalReplicasToTheEnd(replicas, localDc));
207+
return new SimpleQueryPlan(queryPlan.toArray());
208+
}
209+
210+
/** Collect all live nodes grouped by DC. */
211+
private Map<String, List<Node>> getAllNodesByDc() {
212+
Map<String, List<Node>> nodesByDc = new java.util.HashMap<>();
213+
for (String dc : getLiveNodes().dcs()) {
214+
List<Node> dcNodes = new ArrayList<>();
215+
for (Object obj : getLiveNodes().dc(dc).toArray()) {
216+
dcNodes.add((Node) obj);
217+
}
218+
nodesByDc.put(dc, dcNodes);
219+
}
220+
return nodesByDc;
221+
}
222+
223+
/** Add replicas with local DC first, then remote DCs. */
224+
private void addReplicasByDc(List<Node> queryPlan, List<Node> replicas, String localDc) {
225+
replicas.stream()
226+
.filter(r -> Objects.equals(r.getDatacenter(), localDc))
227+
.forEach(queryPlan::add);
228+
replicas.stream()
229+
.filter(r -> !Objects.equals(r.getDatacenter(), localDc))
230+
.forEach(queryPlan::add);
231+
}
232+
233+
/** Add non-replicas with local DC first, then remote DCs (all rotated). */
234+
private void addNonReplicasByDc(
235+
List<Node> queryPlan,
236+
Map<String, List<Node>> nodesByDc,
237+
List<Node> replicas,
238+
String localDc,
239+
Request request) {
240+
// Local DC non-replicas first
241+
addRotatedNonReplicas(
242+
queryPlan, nodesByDc.getOrDefault(localDc, new ArrayList<>()), replicas, request);
243+
// Remote DC non-replicas
244+
for (Map.Entry<String, List<Node>> entry : nodesByDc.entrySet()) {
245+
if (!Objects.equals(entry.getKey(), localDc)) {
246+
addRotatedNonReplicas(queryPlan, entry.getValue(), replicas, request);
247+
}
248+
}
249+
}
250+
251+
/** Add non-replica nodes from given list with rotation. */
252+
private void addRotatedNonReplicas(
253+
List<Node> queryPlan, List<Node> nodes, List<Node> replicas, Request request) {
254+
List<Node> nonReplicas =
255+
nodes.stream().filter(n -> !replicas.contains(n)).collect(Collectors.toList());
256+
if (!nonReplicas.isEmpty()) {
257+
rotateNonReplicas(nonReplicas, request);
258+
queryPlan.addAll(nonReplicas);
259+
}
260+
}
261+
262+
/** Rotates nodes based on routing key (consistent) or randomly. */
263+
private void rotateNonReplicas(List<Node> nodes, @Nullable Request request) {
264+
if (nodes.size() <= 1) return;
265+
266+
int rotationAmount =
267+
(request != null && request.getRoutingKey() != null)
268+
? Math.abs(request.getRoutingKey().hashCode()) % nodes.size()
269+
: randomNextInt(nodes.size());
270+
271+
if (rotationAmount > 0) {
272+
Collections.rotate(nodes, -rotationAmount);
273+
}
192274
}
193275

194276
/**
@@ -329,7 +411,7 @@ private void avoidSlowReplicas(
329411
// - the replica in first or second position is the most recent replica marked as UP and
330412
// - dice roll 1d4 != 1
331413
else if ((newestUpReplica == currentNodes[0] || newestUpReplica == currentNodes[1])
332-
&& diceRoll1d4() != 1) {
414+
&& randomNextInt(4) != 1) {
333415

334416
// Send it to the back of the replicas
335417
ArrayUtils.bubbleDown(
@@ -371,8 +453,8 @@ protected long nanoTime() {
371453
}
372454

373455
/** Exposed as a protected method so that it can be accessed by tests */
374-
protected int diceRoll1d4() {
375-
return ThreadLocalRandom.current().nextInt(4);
456+
protected int randomNextInt(int bound) {
457+
return ThreadLocalRandom.current().nextInt(bound);
376458
}
377459

378460
protected boolean isUnhealthy(@NonNull Node node, @NonNull Session session, long now) {

core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DcInferringLoadBalancingPolicyQueryPlanTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ protected long nanoTime() {
4343
}
4444

4545
@Override
46-
protected int diceRoll1d4() {
46+
protected int randomNextInt(int bound) {
4747
return diceRoll;
4848
}
4949
});

0 commit comments

Comments
 (0)