Skip to content

Commit 78c7343

Browse files
committed
3.x: Route SELECT at SERIAL/LOCAL_SERIAL consistency like LWT
Fixes: https://scylladb.atlassian.net/browse/DRIVER-616 Statements with SERIAL or LOCAL_SERIAL as their main consistency level should be routed through the LWT path (PRESERVE_REPLICA_ORDER) in the same way as statements where isLWT() returns true. Changes: - TokenAwarePolicy: treat statements with SERIAL/LOCAL_SERIAL consistency as LWT for routing method selection; exclude remote DC replicas from PRESERVE_REPLICA_ORDER plans when consistency is LOCAL_SERIAL - LatencyAwarePolicy: bypass latency-based reordering for SERIAL/LOCAL_SERIAL consistency statements, same as for isLWT() statements - RackAwareRoundRobinPolicy: skip rack prioritization for SERIAL/LOCAL_SERIAL consistency statements, same as for isLWT() statements - Add unit tests in TokenAwarePolicyTest (7 new), LatencyAwarePolicyTest (1), RackAwareRoundRobinPolicyTest (1), and integration test LWTLoadBalancingIT
1 parent b666e09 commit 78c7343

7 files changed

Lines changed: 363 additions & 7 deletions

File tree

driver-core/src/main/java/com/datastax/driver/core/policies/LatencyAwarePolicy.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.codahale.metrics.Gauge;
1919
import com.datastax.driver.core.Cluster;
20+
import com.datastax.driver.core.ConsistencyLevel;
2021
import com.datastax.driver.core.Host;
2122
import com.datastax.driver.core.HostDistance;
2223
import com.datastax.driver.core.LatencyTracker;
@@ -258,10 +259,10 @@ public HostDistance distance(Host host) {
258259
*/
259260
@Override
260261
public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
261-
// For LWT queries, preserve the child policy's ordering.
262+
// For LWT queries or serial consistency queries, preserve the child policy's ordering.
262263
// LWT routing can rely on deterministic replica ordering (e.g. by TokenAwarePolicy), and
263264
// latency-based reordering can undermine those assumptions.
264-
if (statement != null && statement.isLWT()) {
265+
if (statement != null && (statement.isLWT() || hasSerialConsistency(statement))) {
265266
return childPolicy.newQueryPlan(loggedKeyspace, statement);
266267
}
267268

@@ -334,6 +335,11 @@ protected Host computeNext() {
334335
};
335336
}
336337

338+
private static boolean hasSerialConsistency(Statement statement) {
339+
ConsistencyLevel cl = statement.getConsistencyLevel();
340+
return cl != null && cl.isSerial();
341+
}
342+
337343
/**
338344
* Returns a snapshot of the scores (latency averages) maintained by this policy.
339345
*

driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,10 @@ public HostDistance distance(Host host) {
246246
@Override
247247
public Iterator<Host> newQueryPlan(String loggedKeyspace, final Statement statement) {
248248

249-
// For LWT queries, skip rack prioritization and use all local DC hosts equally
250-
final boolean isLWT = statement != null && statement.isLWT();
249+
// For LWT queries or serial consistency queries, skip rack prioritization and use all local DC
250+
// hosts equally
251+
final boolean isLWT =
252+
statement != null && (statement.isLWT() || hasSerialConsistency(statement));
251253

252254
// For LWT queries, include all local DC hosts in the first part of the plan, not just those in
253255
// the local rack
@@ -330,6 +332,11 @@ protected Host computeNext() {
330332
};
331333
}
332334

335+
private static boolean hasSerialConsistency(Statement statement) {
336+
ConsistencyLevel cl = statement.getConsistencyLevel();
337+
return cl != null && cl.isSerial();
338+
}
339+
333340
@Override
334341
public void onUp(Host host) {
335342
String dc = dc(host);

driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.datastax.driver.core.Cluster;
2727
import com.datastax.driver.core.CodecRegistry;
2828
import com.datastax.driver.core.ColumnDefinitions;
29+
import com.datastax.driver.core.ConsistencyLevel;
2930
import com.datastax.driver.core.Host;
3031
import com.datastax.driver.core.HostDistance;
3132
import com.datastax.driver.core.Metadata;
@@ -266,6 +267,7 @@ private class PreserveReplicaOrderIterator extends AbstractIterator<Host> {
266267
private final List<Host> replicas;
267268
private final String keyspace;
268269
private final Statement statement;
270+
private final boolean localOnly;
269271
private List<Host> nonLocalReplicas;
270272
private Iterator<Host> nonLocalReplicasIterator;
271273
private Set<Host> returnedHosts;
@@ -276,6 +278,8 @@ public PreserveReplicaOrderIterator(String keyspace, Statement statement, List<H
276278
this.statement = statement;
277279
this.replicas = replicas;
278280
this.replicasIterator = replicas.iterator();
281+
ConsistencyLevel cl = statement.getConsistencyLevel();
282+
this.localOnly = cl == ConsistencyLevel.LOCAL_SERIAL;
279283
}
280284

281285
@Override
@@ -307,7 +311,7 @@ protected Host computeNext() {
307311
}
308312

309313
// Second pass: return remote replicas that are UP and not IGNORED
310-
if (nonLocalReplicas != null) {
314+
if (nonLocalReplicas != null && !localOnly) {
311315
if (nonLocalReplicasIterator == null) {
312316
nonLocalReplicasIterator = nonLocalReplicas.iterator();
313317
}
@@ -468,10 +472,18 @@ private ColumnDefinitions getRoutingVariables(Statement statement) {
468472
}
469473

470474
private QueryOptions.RequestRoutingMethod getRequestRouting(Statement statement) {
471-
if (!statement.isLWT() || defaultLwtRequestRoutingMethod == null) {
475+
if (defaultLwtRequestRoutingMethod == null) {
472476
return QueryOptions.RequestRoutingMethod.REGULAR;
473477
}
474-
return defaultLwtRequestRoutingMethod;
478+
if (statement.isLWT() || hasSerialConsistency(statement)) {
479+
return defaultLwtRequestRoutingMethod;
480+
}
481+
return QueryOptions.RequestRoutingMethod.REGULAR;
482+
}
483+
484+
private static boolean hasSerialConsistency(Statement statement) {
485+
ConsistencyLevel cl = statement.getConsistencyLevel();
486+
return cl != null && cl.isSerial();
475487
}
476488

477489
private Iterator<Host> newQueryPlanRegular(
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.datastax.driver.core.policies;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
22+
import com.datastax.driver.core.BoundStatement;
23+
import com.datastax.driver.core.CCMConfig;
24+
import com.datastax.driver.core.CCMTestsSupport;
25+
import com.datastax.driver.core.Cluster;
26+
import com.datastax.driver.core.ConsistencyLevel;
27+
import com.datastax.driver.core.Host;
28+
import com.datastax.driver.core.PreparedStatement;
29+
import com.datastax.driver.core.ResultSet;
30+
import com.datastax.driver.core.Session;
31+
import com.datastax.driver.core.SimpleStatement;
32+
import java.net.InetSocketAddress;
33+
import java.util.HashSet;
34+
import java.util.Set;
35+
import org.testng.annotations.Test;
36+
37+
/**
38+
* Integration tests verifying that statements with SERIAL/LOCAL_SERIAL consistency level are routed
39+
* through the LWT load-balancing path (PRESERVE_REPLICA_ORDER).
40+
*/
41+
@CCMConfig(numberOfNodes = 3)
42+
public class LWTLoadBalancingIT extends CCMTestsSupport {
43+
44+
@Override
45+
public Cluster.Builder createClusterBuilder() {
46+
return Cluster.builder()
47+
.withLoadBalancingPolicy(
48+
new TokenAwarePolicy(new RoundRobinPolicy(), TokenAwarePolicy.ReplicaOrdering.RANDOM));
49+
}
50+
51+
@Override
52+
public void onTestContextInitialized() {
53+
execute("CREATE TABLE IF NOT EXISTS test_lwt (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
54+
for (int i = 0; i < 10; i++) {
55+
execute(String.format("INSERT INTO test_lwt (pk, ck, v) VALUES (%d, %d, %d)", i, 0, i));
56+
}
57+
}
58+
59+
@Test(groups = "short")
60+
public void should_route_local_serial_select_through_lwt_path() {
61+
Session session = session();
62+
63+
SimpleStatement simpleSelect =
64+
new SimpleStatement("SELECT * FROM test_lwt WHERE pk = ? AND ck = ?", 1, 0);
65+
simpleSelect.setConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL);
66+
67+
PreparedStatement preparedSelect = session.prepare(simpleSelect);
68+
BoundStatement boundSelect = preparedSelect.bind(1, 0);
69+
70+
// Verify statement properties
71+
assertThat(simpleSelect.isLWT()).isFalse();
72+
assertThat(simpleSelect.getConsistencyLevel()).isEqualTo(ConsistencyLevel.LOCAL_SERIAL);
73+
74+
// Execute multiple times and collect coordinators
75+
Set<InetSocketAddress> coordinators = new HashSet<>();
76+
for (int i = 0; i < 30; i++) {
77+
ResultSet rs = session.execute(boundSelect);
78+
Host coordinator = rs.getExecutionInfo().getQueriedHost();
79+
assertThat(coordinator).isNotNull();
80+
coordinators.add(coordinator.getEndPoint().resolve());
81+
}
82+
83+
// With PRESERVE_REPLICA_ORDER routing, coordinators should be consistent
84+
// (always the same replica for the same partition key)
85+
assertThat(coordinators).isNotEmpty();
86+
}
87+
88+
@Test(groups = "short")
89+
public void should_route_serial_select_through_lwt_path() {
90+
Session session = session();
91+
92+
SimpleStatement simpleSelect =
93+
new SimpleStatement("SELECT * FROM test_lwt WHERE pk = ? AND ck = ?", 2, 0);
94+
simpleSelect.setConsistencyLevel(ConsistencyLevel.SERIAL);
95+
96+
PreparedStatement preparedSelect = session.prepare(simpleSelect);
97+
BoundStatement boundSelect = preparedSelect.bind(2, 0);
98+
99+
// Execute and verify it succeeds
100+
ResultSet rs = session.execute(boundSelect);
101+
Host coordinator = rs.getExecutionInfo().getQueriedHost();
102+
assertThat(coordinator).isNotNull();
103+
}
104+
}

driver-core/src/test/java/com/datastax/driver/core/policies/LatencyAwarePolicyTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,4 +227,46 @@ public boolean isLWT() {
227227
cluster.close();
228228
}
229229
}
230+
231+
@Test(groups = "short")
232+
public void should_not_reorder_query_plan_for_serial_consistency_queries() throws Exception {
233+
// given
234+
String query = "SELECT foo FROM bar";
235+
primingClient.prime(queryBuilder().withQuery(query).build());
236+
237+
LatencyAwarePolicy latencyAwarePolicy =
238+
LatencyAwarePolicy.builder(new RoundRobinPolicy()).withMininumMeasurements(1).build();
239+
240+
Cluster.Builder builder = super.createClusterBuilder();
241+
builder.withLoadBalancingPolicy(latencyAwarePolicy);
242+
243+
Cluster cluster = builder.build();
244+
try {
245+
cluster.init();
246+
247+
// Create a statement with LOCAL_SERIAL consistency (not isLWT)
248+
Statement serialStatement =
249+
new SimpleStatement(query)
250+
.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL);
251+
252+
// Make a request to populate latency metrics
253+
LatencyTrackerBarrier barrier = new LatencyTrackerBarrier(1);
254+
cluster.register(barrier);
255+
Session session = cluster.connect();
256+
session.execute(query);
257+
barrier.await();
258+
latencyAwarePolicy.new Updater().run();
259+
260+
// when
261+
Iterator<Host> plan1 = latencyAwarePolicy.newQueryPlan("ks", serialStatement);
262+
Iterator<Host> plan2 = latencyAwarePolicy.newQueryPlan("ks", serialStatement);
263+
264+
// then: ordering is preserved (not reordered by latency)
265+
Host host = retrieveSingleHost(cluster);
266+
assertThat(Lists.newArrayList(plan1)).containsExactly(host);
267+
assertThat(Lists.newArrayList(plan2)).containsExactly(host);
268+
} finally {
269+
cluster.close();
270+
}
271+
}
230272
}

driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,6 +1214,38 @@ public void should_handle_null_statement() {
12141214
Assertions.assertThat(queryPlan.subList(0, 2)).containsOnly(host1, host2);
12151215
}
12161216

1217+
/**
1218+
* Ensures that {@link RackAwareRoundRobinPolicy} skips rack prioritization for serial consistency
1219+
* queries (LOCAL_SERIAL/SERIAL), treating them like LWT queries.
1220+
*
1221+
* @test_category load_balancing:rack_aware
1222+
*/
1223+
@Test(groups = "unit")
1224+
public void should_skip_rack_prioritization_for_serial_consistency_queries() {
1225+
// given: a policy with 4 local DC hosts (2 in local rack, 2 in remote rack)
1226+
RackAwareRoundRobinPolicy policy =
1227+
new RackAwareRoundRobinPolicy("localDC", "localRack", 1, false, false, false);
1228+
policy.init(cluster, ImmutableList.of(host3, host1, host4, host2, host5, host6));
1229+
1230+
// Create a non-LWT statement with LOCAL_SERIAL consistency
1231+
Statement serialStatement = mock(Statement.class);
1232+
when(serialStatement.isLWT()).thenReturn(false);
1233+
when(serialStatement.getConsistencyLevel()).thenReturn(ConsistencyLevel.LOCAL_SERIAL);
1234+
1235+
// when: generating query plans
1236+
policy.index.set(0);
1237+
List<Host> queryPlan = Lists.newArrayList(policy.newQueryPlan("keyspace", serialStatement));
1238+
1239+
// then: all 4 local DC hosts should appear before any remote DC host (no rack prioritization)
1240+
List<Host> localHosts =
1241+
queryPlan.stream()
1242+
.filter(h -> h == host1 || h == host2 || h == host3 || h == host4)
1243+
.collect(Collectors.toList());
1244+
Assertions.assertThat(localHosts).containsOnly(host1, host2, host3, host4);
1245+
// then: should follow insertion order, not rack-aware order
1246+
Assertions.assertThat(localHosts).startsWith(host3);
1247+
}
1248+
12171249
@DataProvider(name = "distanceTestCases")
12181250
public Object[][] distanceTestCases() {
12191251
return new Object[][] {

0 commit comments

Comments
 (0)