Skip to content

Commit e9d38b1

Browse files
nikagradkropachev
authored andcommitted
3.x: Route SELECT at SERIAL/LOCAL_SERIAL consistency like LWT
Fixes: https://scylladb.atlassian.net/browse/DRIVER-616 Statements with SERIAL or LOCAL_SERIAL as their main consistency level should be routed through the LWT path (PRESERVE_REPLICA_ORDER) in the same way as statements where isLWT() returns true. Changes: - TokenAwarePolicy: treat statements with SERIAL/LOCAL_SERIAL consistency as LWT for routing method selection; exclude remote DC replicas from PRESERVE_REPLICA_ORDER plans when consistency is LOCAL_SERIAL - LatencyAwarePolicy: bypass latency-based reordering for SERIAL/LOCAL_SERIAL consistency statements, same as for isLWT() statements - RackAwareRoundRobinPolicy: skip rack prioritization for SERIAL/LOCAL_SERIAL consistency statements, same as for isLWT() statements - Add unit tests in TokenAwarePolicyTest (7 new), LatencyAwarePolicyTest (1), RackAwareRoundRobinPolicyTest (1), and integration test LWTLoadBalancingIT
1 parent 43de6b3 commit e9d38b1

8 files changed

Lines changed: 416 additions & 9 deletions

File tree

driver-core/src/main/java/com/datastax/driver/core/Statement.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,27 @@ public boolean isLWT() {
9191
return false;
9292
}
9393

94+
/**
95+
* Returns {@code true} if this statement's effective consistency level is serial ({@link
96+
* ConsistencyLevel#SERIAL} or {@link ConsistencyLevel#LOCAL_SERIAL}). When no consistency level
97+
* has been explicitly set on this statement, falls back to the provided default.
98+
*
99+
* <p>This is used by load-balancing policies to route serial-consistency statements through the
100+
* LWT path, even when {@link #isLWT()} returns {@code false}.
101+
*
102+
* @param defaultConsistencyLevel the cluster-wide default consistency level to fall back to when
103+
* this statement has no explicit consistency level set.
104+
* @return whether the effective consistency level is serial.
105+
*/
106+
public static boolean hasSerialConsistency(
107+
Statement statement, ConsistencyLevel defaultConsistencyLevel) {
108+
ConsistencyLevel cl = statement.getConsistencyLevel();
109+
if (cl == null) {
110+
cl = defaultConsistencyLevel;
111+
}
112+
return cl != null && cl.isSerial();
113+
}
114+
94115
/**
95116
* Sets the consistency level for the query.
96117
*

driver-core/src/main/java/com/datastax/driver/core/policies/LatencyAwarePolicy.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.datastax.driver.core.LatencyTracker;
2323
import com.datastax.driver.core.Metrics;
2424
import com.datastax.driver.core.MetricsUtil;
25+
import com.datastax.driver.core.QueryOptions;
2526
import com.datastax.driver.core.Statement;
2627
import com.datastax.driver.core.exceptions.BootstrappingException;
2728
import com.datastax.driver.core.exceptions.DriverException;
@@ -102,6 +103,7 @@ public class LatencyAwarePolicy implements ChainableLoadBalancingPolicy {
102103
private final long retryPeriod;
103104
private final long minMeasure;
104105
private volatile Metrics metrics;
106+
private volatile QueryOptions queryOptions;
105107

106108
private LatencyAwarePolicy(
107109
LoadBalancingPolicy childPolicy,
@@ -218,6 +220,7 @@ public void init(Cluster cluster, Collection<Host> hosts) {
218220
}
219221
cluster.register(latencyTracker);
220222
metrics = cluster.getMetrics();
223+
queryOptions = cluster.getConfiguration().getQueryOptions();
221224
if (metrics != null) {
222225
metrics
223226
.getRegistry()
@@ -258,10 +261,13 @@ public HostDistance distance(Host host) {
258261
*/
259262
@Override
260263
public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
261-
// For LWT queries, preserve the child policy's ordering.
264+
// For LWT queries or serial consistency queries, preserve the child policy's ordering.
262265
// LWT routing can rely on deterministic replica ordering (e.g. by TokenAwarePolicy), and
263266
// latency-based reordering can undermine those assumptions.
264-
if (statement != null && statement.isLWT()) {
267+
if (statement != null
268+
&& (statement.isLWT()
269+
|| Statement.hasSerialConsistency(
270+
statement, queryOptions != null ? queryOptions.getConsistencyLevel() : null))) {
265271
return childPolicy.newQueryPlan(loggedKeyspace, statement);
266272
}
267273

driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,13 @@ public HostDistance distance(Host host) {
246246
@Override
247247
public Iterator<Host> newQueryPlan(String loggedKeyspace, final Statement statement) {
248248

249-
// For LWT queries, skip rack prioritization and use all local DC hosts equally
250-
final boolean isLWT = statement != null && statement.isLWT();
249+
// For LWT queries or serial consistency queries, skip rack prioritization and use all local DC
250+
// hosts equally
251+
ConsistencyLevel defaultCl =
252+
configuration != null ? configuration.getQueryOptions().getConsistencyLevel() : null;
253+
final boolean isLWT =
254+
statement != null
255+
&& (statement.isLWT() || Statement.hasSerialConsistency(statement, defaultCl));
251256

252257
// For LWT queries, include all local DC hosts in the first part of the plan, not just those in
253258
// the local rack

driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.datastax.driver.core.Cluster;
2727
import com.datastax.driver.core.CodecRegistry;
2828
import com.datastax.driver.core.ColumnDefinitions;
29+
import com.datastax.driver.core.ConsistencyLevel;
2930
import com.datastax.driver.core.Host;
3031
import com.datastax.driver.core.HostDistance;
3132
import com.datastax.driver.core.Metadata;
@@ -266,6 +267,7 @@ private class PreserveReplicaOrderIterator extends AbstractIterator<Host> {
266267
private final List<Host> replicas;
267268
private final String keyspace;
268269
private final Statement statement;
270+
private final boolean localOnly;
269271
private List<Host> nonLocalReplicas;
270272
private Iterator<Host> nonLocalReplicasIterator;
271273
private Set<Host> returnedHosts;
@@ -276,6 +278,11 @@ public PreserveReplicaOrderIterator(String keyspace, Statement statement, List<H
276278
this.statement = statement;
277279
this.replicas = replicas;
278280
this.replicasIterator = replicas.iterator();
281+
ConsistencyLevel cl = statement.getConsistencyLevel();
282+
if (cl == null && queryOptions != null) {
283+
cl = queryOptions.getConsistencyLevel();
284+
}
285+
this.localOnly = cl == ConsistencyLevel.LOCAL_SERIAL;
279286
}
280287

281288
@Override
@@ -307,7 +314,7 @@ protected Host computeNext() {
307314
}
308315

309316
// Second pass: return remote replicas that are UP and not IGNORED
310-
if (nonLocalReplicas != null) {
317+
if (nonLocalReplicas != null && !localOnly) {
311318
if (nonLocalReplicasIterator == null) {
312319
nonLocalReplicasIterator = nonLocalReplicas.iterator();
313320
}
@@ -347,6 +354,7 @@ protected Host computeNext() {
347354
private volatile Metadata clusterMetadata;
348355
private volatile ProtocolVersion protocolVersion;
349356
private volatile CodecRegistry codecRegistry;
357+
private volatile QueryOptions queryOptions;
350358
private volatile QueryOptions.RequestRoutingMethod defaultLwtRequestRoutingMethod;
351359

352360
/**
@@ -395,8 +403,8 @@ public void init(Cluster cluster, Collection<Host> hosts) {
395403
clusterMetadata = cluster.getMetadata();
396404
protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
397405
codecRegistry = cluster.getConfiguration().getCodecRegistry();
398-
defaultLwtRequestRoutingMethod =
399-
cluster.getConfiguration().getQueryOptions().getLoadBalancingLwtRequestRoutingMethod();
406+
queryOptions = cluster.getConfiguration().getQueryOptions();
407+
defaultLwtRequestRoutingMethod = queryOptions.getLoadBalancingLwtRequestRoutingMethod();
400408
childPolicy.init(cluster, hosts);
401409
}
402410

@@ -468,10 +476,14 @@ private ColumnDefinitions getRoutingVariables(Statement statement) {
468476
}
469477

470478
private QueryOptions.RequestRoutingMethod getRequestRouting(Statement statement) {
471-
if (!statement.isLWT() || defaultLwtRequestRoutingMethod == null) {
479+
if (defaultLwtRequestRoutingMethod == null) {
472480
return QueryOptions.RequestRoutingMethod.REGULAR;
473481
}
474-
return defaultLwtRequestRoutingMethod;
482+
if (statement.isLWT()
483+
|| Statement.hasSerialConsistency(statement, queryOptions.getConsistencyLevel())) {
484+
return defaultLwtRequestRoutingMethod;
485+
}
486+
return QueryOptions.RequestRoutingMethod.REGULAR;
475487
}
476488

477489
private Iterator<Host> newQueryPlanRegular(
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.datastax.driver.core.policies;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
22+
import com.datastax.driver.core.BoundStatement;
23+
import com.datastax.driver.core.CCMConfig;
24+
import com.datastax.driver.core.CCMTestsSupport;
25+
import com.datastax.driver.core.Cluster;
26+
import com.datastax.driver.core.ConsistencyLevel;
27+
import com.datastax.driver.core.Host;
28+
import com.datastax.driver.core.PreparedStatement;
29+
import com.datastax.driver.core.ResultSet;
30+
import com.datastax.driver.core.Session;
31+
import com.datastax.driver.core.SimpleStatement;
32+
import java.net.InetSocketAddress;
33+
import java.util.HashSet;
34+
import java.util.Set;
35+
import org.testng.annotations.Test;
36+
37+
/**
38+
* Integration tests verifying that statements with SERIAL/LOCAL_SERIAL consistency level are routed
39+
* through the LWT load-balancing path (PRESERVE_REPLICA_ORDER).
40+
*/
41+
@CCMConfig(numberOfNodes = 3)
42+
public class LWTLoadBalancingIT extends CCMTestsSupport {
43+
44+
@Override
45+
public Cluster.Builder createClusterBuilder() {
46+
return Cluster.builder()
47+
.withLoadBalancingPolicy(
48+
new TokenAwarePolicy(new RoundRobinPolicy(), TokenAwarePolicy.ReplicaOrdering.RANDOM));
49+
}
50+
51+
@Override
52+
public void onTestContextInitialized() {
53+
execute("CREATE TABLE IF NOT EXISTS test_lwt (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
54+
for (int i = 0; i < 10; i++) {
55+
execute(String.format("INSERT INTO test_lwt (pk, ck, v) VALUES (%d, %d, %d)", i, 0, i));
56+
}
57+
}
58+
59+
@Test(groups = "short")
60+
public void should_route_local_serial_select_through_lwt_path() {
61+
Session session = session();
62+
63+
SimpleStatement simpleSelect =
64+
new SimpleStatement("SELECT * FROM test_lwt WHERE pk = ? AND ck = ?", 1, 0);
65+
simpleSelect.setConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL);
66+
67+
PreparedStatement preparedSelect = session.prepare(simpleSelect);
68+
BoundStatement boundSelect = preparedSelect.bind(1, 0);
69+
70+
// Verify statement properties
71+
assertThat(simpleSelect.isLWT()).isFalse();
72+
assertThat(simpleSelect.getConsistencyLevel()).isEqualTo(ConsistencyLevel.LOCAL_SERIAL);
73+
74+
// Execute multiple times and collect coordinators — with PRESERVE_REPLICA_ORDER routing,
75+
// the same partition key should always be routed to the same first replica.
76+
Set<InetSocketAddress> coordinators = new HashSet<>();
77+
for (int i = 0; i < 30; i++) {
78+
ResultSet rs = session.execute(boundSelect);
79+
Host coordinator = rs.getExecutionInfo().getQueriedHost();
80+
assertThat(coordinator).isNotNull();
81+
coordinators.add(coordinator.getEndPoint().resolve());
82+
}
83+
84+
// With PRESERVE_REPLICA_ORDER, the first replica is deterministic for a given partition key,
85+
// so all 30 executions should hit the same coordinator.
86+
assertThat(coordinators).hasSize(1);
87+
}
88+
89+
@Test(groups = "short")
90+
public void should_route_serial_select_through_lwt_path() {
91+
Session session = session();
92+
93+
SimpleStatement simpleSelect =
94+
new SimpleStatement("SELECT * FROM test_lwt WHERE pk = ? AND ck = ?", 2, 0);
95+
simpleSelect.setConsistencyLevel(ConsistencyLevel.SERIAL);
96+
97+
PreparedStatement preparedSelect = session.prepare(simpleSelect);
98+
BoundStatement boundSelect = preparedSelect.bind(2, 0);
99+
100+
// Execute multiple times and collect coordinators
101+
Set<InetSocketAddress> coordinators = new HashSet<>();
102+
for (int i = 0; i < 30; i++) {
103+
ResultSet rs = session.execute(boundSelect);
104+
Host coordinator = rs.getExecutionInfo().getQueriedHost();
105+
assertThat(coordinator).isNotNull();
106+
coordinators.add(coordinator.getEndPoint().resolve());
107+
}
108+
109+
// With PRESERVE_REPLICA_ORDER, the first replica is deterministic for a given partition key.
110+
assertThat(coordinators).hasSize(1);
111+
}
112+
}

driver-core/src/test/java/com/datastax/driver/core/policies/LatencyAwarePolicyTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,4 +227,46 @@ public boolean isLWT() {
227227
cluster.close();
228228
}
229229
}
230+
231+
@Test(groups = "short")
232+
public void should_not_reorder_query_plan_for_serial_consistency_queries() throws Exception {
233+
// given
234+
String query = "SELECT foo FROM bar";
235+
primingClient.prime(queryBuilder().withQuery(query).build());
236+
237+
LatencyAwarePolicy latencyAwarePolicy =
238+
LatencyAwarePolicy.builder(new RoundRobinPolicy()).withMininumMeasurements(1).build();
239+
240+
Cluster.Builder builder = super.createClusterBuilder();
241+
builder.withLoadBalancingPolicy(latencyAwarePolicy);
242+
243+
Cluster cluster = builder.build();
244+
try {
245+
cluster.init();
246+
247+
// Create a statement with LOCAL_SERIAL consistency (not isLWT)
248+
Statement serialStatement =
249+
new SimpleStatement(query)
250+
.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL);
251+
252+
// Make a request to populate latency metrics
253+
LatencyTrackerBarrier barrier = new LatencyTrackerBarrier(1);
254+
cluster.register(barrier);
255+
Session session = cluster.connect();
256+
session.execute(query);
257+
barrier.await();
258+
latencyAwarePolicy.new Updater().run();
259+
260+
// when
261+
Iterator<Host> plan1 = latencyAwarePolicy.newQueryPlan("ks", serialStatement);
262+
Iterator<Host> plan2 = latencyAwarePolicy.newQueryPlan("ks", serialStatement);
263+
264+
// then: ordering is preserved (not reordered by latency)
265+
Host host = retrieveSingleHost(cluster);
266+
assertThat(Lists.newArrayList(plan1)).containsExactly(host);
267+
assertThat(Lists.newArrayList(plan2)).containsExactly(host);
268+
} finally {
269+
cluster.close();
270+
}
271+
}
230272
}

driver-core/src/test/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicyTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,6 +1214,38 @@ public void should_handle_null_statement() {
12141214
Assertions.assertThat(queryPlan.subList(0, 2)).containsOnly(host1, host2);
12151215
}
12161216

1217+
/**
1218+
* Ensures that {@link RackAwareRoundRobinPolicy} skips rack prioritization for serial consistency
1219+
* queries (LOCAL_SERIAL/SERIAL), treating them like LWT queries.
1220+
*
1221+
* @test_category load_balancing:rack_aware
1222+
*/
1223+
@Test(groups = "unit")
1224+
public void should_skip_rack_prioritization_for_serial_consistency_queries() {
1225+
// given: a policy with 4 local DC hosts (2 in local rack, 2 in remote rack)
1226+
RackAwareRoundRobinPolicy policy =
1227+
new RackAwareRoundRobinPolicy("localDC", "localRack", 1, false, false, false);
1228+
policy.init(cluster, ImmutableList.of(host3, host1, host4, host2, host5, host6));
1229+
1230+
// Create a non-LWT statement with LOCAL_SERIAL consistency
1231+
Statement serialStatement = mock(Statement.class);
1232+
when(serialStatement.isLWT()).thenReturn(false);
1233+
when(serialStatement.getConsistencyLevel()).thenReturn(ConsistencyLevel.LOCAL_SERIAL);
1234+
1235+
// when: generating query plans
1236+
policy.index.set(0);
1237+
List<Host> queryPlan = Lists.newArrayList(policy.newQueryPlan("keyspace", serialStatement));
1238+
1239+
// then: all 4 local DC hosts should appear before any remote DC host (no rack prioritization)
1240+
List<Host> localHosts =
1241+
queryPlan.stream()
1242+
.filter(h -> h == host1 || h == host2 || h == host3 || h == host4)
1243+
.collect(Collectors.toList());
1244+
Assertions.assertThat(localHosts).containsOnly(host1, host2, host3, host4);
1245+
// then: should follow insertion order, not rack-aware order
1246+
Assertions.assertThat(localHosts).startsWith(host3);
1247+
}
1248+
12171249
@DataProvider(name = "distanceTestCases")
12181250
public Object[][] distanceTestCases() {
12191251
return new Object[][] {

0 commit comments

Comments
 (0)