Skip to content

Commit 935f2dd

Browse files
committed
Start consider SERIAL SELECT as LWT queries
Driver did not consider SERIAL SELECT as LWT and therefore routed them as regular queries causing LWT congestion. Fix is to consider consistency when RequestRoutingMethod is calculated.
1 parent 441eb42 commit 935f2dd

16 files changed

Lines changed: 807 additions & 33 deletions

File tree

core/src/main/java/com/datastax/oss/driver/api/core/cql/SimpleStatement.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.datastax.oss.driver.api.core.CqlIdentifier;
2121
import com.datastax.oss.driver.api.core.CqlSession;
2222
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
23-
import com.datastax.oss.driver.api.core.RequestRoutingType;
2423
import com.datastax.oss.driver.api.core.context.DriverContext;
2524
import com.datastax.oss.driver.api.core.session.Request;
2625
import com.datastax.oss.driver.internal.core.cql.DefaultSimpleStatement;
@@ -86,7 +85,7 @@ static SimpleStatement newInstance(@NonNull String cqlQuery) {
8685
null,
8786
null,
8887
Statement.NO_NOW_IN_SECONDS,
89-
RequestRoutingType.REGULAR);
88+
null);
9089
}
9190

9291
/**
@@ -121,7 +120,7 @@ static SimpleStatement newInstance(
121120
null,
122121
null,
123122
Statement.NO_NOW_IN_SECONDS,
124-
RequestRoutingType.REGULAR);
123+
null);
125124
}
126125

127126
/**
@@ -153,7 +152,7 @@ static SimpleStatement newInstance(
153152
null,
154153
null,
155154
Statement.NO_NOW_IN_SECONDS,
156-
RequestRoutingType.REGULAR);
155+
null);
157156
}
158157

159158
/**

core/src/main/java/com/datastax/oss/driver/api/core/cql/StatementBuilder.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
2424
import com.datastax.oss.driver.api.core.metadata.Node;
2525
import com.datastax.oss.driver.api.core.metadata.token.Token;
26+
import com.datastax.oss.driver.internal.core.cql.RequestRoutingTypeAccessor;
2627
import com.datastax.oss.driver.internal.core.util.RoutingKey;
2728
import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
2829
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -89,7 +90,15 @@ protected StatementBuilder(StatementT template) {
8990
this.timeout = template.getTimeout();
9091
this.node = template.getNode();
9192
this.nowInSeconds = template.getNowInSeconds();
92-
this.requestRoutingType = template.getRequestRoutingType();
93+
this.requestRoutingType = getConfiguredRequestRoutingType(template);
94+
}
95+
96+
@Nullable
97+
private RequestRoutingType getConfiguredRequestRoutingType(StatementT template) {
98+
if (template instanceof RequestRoutingTypeAccessor) {
99+
return ((RequestRoutingTypeAccessor) template).getConfiguredRequestRoutingType();
100+
}
101+
return template.getRequestRoutingType();
93102
}
94103

95104
/** @see Statement#setExecutionProfileName(String) */

core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ public static DefaultPreparedStatement toPreparedStatement(
430430
context.getProtocolVersion(),
431431
lwtInfo != null && lwtInfo.isLwt(response.variablesMetadata.flags)
432432
? RequestRoutingType.LWT
433-
: RequestRoutingType.REGULAR);
433+
: null);
434434
}
435435

436436
public static ColumnDefinitions toColumnDefinitions(

core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import org.slf4j.LoggerFactory;
5050

5151
@Immutable
52-
public class DefaultBatchStatement implements BatchStatement {
52+
public class DefaultBatchStatement implements BatchStatement, RequestRoutingTypeAccessor {
5353
private static final Logger LOG = LoggerFactory.getLogger(DefaultBatchStatement.class);
5454

5555
private final BatchType batchType;
@@ -857,6 +857,8 @@ public BatchStatement setNowInSeconds(int newNowInSeconds) {
857857
public RequestRoutingType getRequestRoutingType() {
858858
if (Objects.nonNull(requestRoutingType)) {
859859
return requestRoutingType;
860+
} else if (consistencyLevel != null && consistencyLevel.isSerial()) {
861+
return RequestRoutingType.LWT;
860862
} else if (Objects.isNull(
861863
cachedStatementsRequestRoutingType)) { // Immutability of the statement list and statements
862864
// allows us to cache the result
@@ -870,6 +872,12 @@ public RequestRoutingType getRequestRoutingType() {
870872
return cachedStatementsRequestRoutingType;
871873
}
872874

875+
@Nullable
876+
@Override
877+
public RequestRoutingType getConfiguredRequestRoutingType() {
878+
return requestRoutingType;
879+
}
880+
873881
@NonNull
874882
@Override
875883
public BatchStatement setRequestRoutingType(RequestRoutingType requestRoutingType) {

core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,10 @@
4444
import java.util.Arrays;
4545
import java.util.List;
4646
import java.util.Map;
47-
import java.util.Objects;
4847
import net.jcip.annotations.Immutable;
4948

5049
@Immutable
51-
public class DefaultBoundStatement implements BoundStatement {
50+
public class DefaultBoundStatement implements BoundStatement, RequestRoutingTypeAccessor {
5251

5352
private final PreparedStatement preparedStatement;
5453
private final ColumnDefinitions variableDefinitions;
@@ -805,9 +804,22 @@ public BoundStatement setNowInSeconds(int newNowInSeconds) {
805804
@Nullable
806805
@Override
807806
public RequestRoutingType getRequestRoutingType() {
808-
return Objects.nonNull(requestRoutingType)
809-
? requestRoutingType
810-
: preparedStatement.getRequestRoutingType();
807+
if (requestRoutingType != null) {
808+
return requestRoutingType;
809+
}
810+
if (consistencyLevel != null && consistencyLevel.isSerial()) {
811+
return RequestRoutingType.LWT;
812+
}
813+
if (preparedStatement instanceof RequestRoutingTypeAccessor) {
814+
return ((RequestRoutingTypeAccessor) preparedStatement).getConfiguredRequestRoutingType();
815+
}
816+
return preparedStatement.getRequestRoutingType();
817+
}
818+
819+
@Nullable
820+
@Override
821+
public RequestRoutingType getConfiguredRequestRoutingType() {
822+
return requestRoutingType;
811823
}
812824

813825
@NonNull

core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
import org.slf4j.LoggerFactory;
5959

6060
@ThreadSafe
61-
public class DefaultPreparedStatement implements PreparedStatement {
61+
public class DefaultPreparedStatement implements PreparedStatement, RequestRoutingTypeAccessor {
6262
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPreparedStatement.class);
6363
private static final Splitter SPACE_SPLITTER = Splitter.onPattern("\\s+");
6464
private static final Splitter COMMA_SPLITTER = Splitter.onPattern(",");
@@ -196,6 +196,20 @@ public boolean isLWT() {
196196
@Nullable
197197
@Override
198198
public RequestRoutingType getRequestRoutingType() {
199+
if (requestRoutingType != null) {
200+
return requestRoutingType;
201+
}
202+
203+
if (consistencyLevelForBoundStatements != null
204+
&& consistencyLevelForBoundStatements.isSerial()) {
205+
return RequestRoutingType.LWT;
206+
}
207+
return null;
208+
}
209+
210+
@Nullable
211+
@Override
212+
public RequestRoutingType getConfiguredRequestRoutingType() {
199213
return requestRoutingType;
200214
}
201215

core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultSimpleStatement.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import net.jcip.annotations.Immutable;
4343

4444
@Immutable
45-
public class DefaultSimpleStatement implements SimpleStatement {
45+
public class DefaultSimpleStatement implements SimpleStatement, RequestRoutingTypeAccessor {
4646

4747
private final String query;
4848
private final List<Object> positionalValues;
@@ -776,6 +776,18 @@ public SimpleStatement setNowInSeconds(int newNowInSeconds) {
776776
@Nullable
777777
@Override
778778
public RequestRoutingType getRequestRoutingType() {
779+
if (requestRoutingType != null) {
780+
return requestRoutingType;
781+
}
782+
if (consistencyLevel != null && consistencyLevel.isSerial()) {
783+
return RequestRoutingType.LWT;
784+
}
785+
return null;
786+
}
787+
788+
@Nullable
789+
@Override
790+
public RequestRoutingType getConfiguredRequestRoutingType() {
779791
return requestRoutingType;
780792
}
781793

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.datastax.oss.driver.internal.core.cql;
19+
20+
import com.datastax.oss.driver.api.core.RequestRoutingType;
21+
import edu.umd.cs.findbugs.annotations.Nullable;
22+
23+
/** Internal hook to distinguish stored routing type from consistency-inferred routing type. */
24+
public interface RequestRoutingTypeAccessor {
25+
@Nullable
26+
RequestRoutingType getConfiguredRequestRoutingType();
27+
}

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,37 @@ public RequestRoutingMethod getRequestRoutingMethod(@Nullable Request request) {
189189
if (request == null) {
190190
return RequestRoutingMethod.REGULAR;
191191
}
192-
if (request.getRequestRoutingType() == RequestRoutingType.LWT) {
192+
RequestRoutingType requestRoutingType = request.getRequestRoutingType();
193+
if (requestRoutingType == RequestRoutingType.LWT
194+
|| (requestRoutingType == null && hasSerialConsistency(request))) {
193195
return lwtRequestRoutingMethod;
194196
} else {
195197
return RequestRoutingMethod.REGULAR;
196198
}
197199
}
198200

201+
private boolean hasSerialConsistency(@NonNull Request request) {
202+
if (!(request instanceof Statement)) {
203+
return false;
204+
}
205+
206+
return getEffectiveConsistency((Statement<?>) request).isSerial();
207+
}
208+
209+
@NonNull
210+
private Optional<DriverExecutionProfile> getRequestProfile(@NonNull Request request) {
211+
DriverExecutionProfile requestProfile = request.getExecutionProfile();
212+
if (requestProfile != null) {
213+
return Optional.of(requestProfile);
214+
}
215+
216+
String profileName = request.getExecutionProfileName();
217+
if (profileName != null && !profileName.isEmpty()) {
218+
return Optional.of(context.getConfig().getProfile(profileName));
219+
}
220+
return Optional.of(profile);
221+
}
222+
199223
/**
200224
* Returns the local datacenter name, if known; empty otherwise.
201225
*
@@ -365,15 +389,27 @@ protected Queue<Node> newQueryPlanPreserveReplicas(
365389
queryPlan.addAll(replicas);
366390
addRotatedNonReplicas(queryPlan, allNodes, replicas, request);
367391
} else {
368-
// With local DC: prioritize local, then remote
369-
Map<String, List<Node>> nodesByDc = getAllNodesByDc();
392+
boolean includeRemoteDcs = isDcFailoverAllowedForRequest(request);
393+
Map<String, List<Node>> nodesByDc =
394+
includeRemoteDcs
395+
? getAllNodesByDc()
396+
: Collections.singletonMap(localDc, dcNodeList(localDc));
397+
if (!includeRemoteDcs) {
398+
replicas = filterNodesInDc(replicas, localDc);
399+
}
370400
addReplicasByDc(queryPlan, replicas, localDc);
371401
addNonReplicasByDc(queryPlan, nodesByDc, replicas, localDc, request);
372402
}
373403

374404
return new SimpleQueryPlan(queryPlan.toArray());
375405
}
376406

407+
private List<Node> filterNodesInDc(List<Node> nodes, String dc) {
408+
return nodes.stream()
409+
.filter(node -> Objects.equals(node.getDatacenter(), dc))
410+
.collect(Collectors.toList());
411+
}
412+
377413
/** Collect all live nodes grouped by DC, with preferred remote DCs ordered first. */
378414
private Map<String, List<Node>> getAllNodesByDc() {
379415
Map<String, List<Node>> nodesByDc = new LinkedHashMap<>();
@@ -537,22 +573,38 @@ protected Queue<Node> maybeAddDcFailover(@Nullable Request request, @NonNull Que
537573
if (maxNodesPerRemoteDc <= 0 || localDc == null) {
538574
return local;
539575
}
540-
if (!allowDcFailoverForLocalCl && request instanceof Statement) {
541-
Statement<?> statement = (Statement<?>) request;
542-
ConsistencyLevel consistency = statement.getConsistencyLevel();
543-
if (consistency == null) {
544-
consistency = defaultConsistencyLevel;
545-
}
546-
if (consistency.isDcLocal()) {
547-
return local;
548-
}
576+
if (!isDcFailoverAllowedForRequest(request)) {
577+
return local;
549578
}
550579
if (preferredRemoteDcs.isEmpty()) {
551580
return new CompositeQueryPlan(local, buildRemoteQueryPlanAll());
552581
}
553582
return new CompositeQueryPlan(local, buildRemoteQueryPlanPreferred());
554583
}
555584

585+
private boolean isDcFailoverAllowedForRequest(@Nullable Request request) {
586+
if (!allowDcFailoverForLocalCl && request instanceof Statement) {
587+
return !getEffectiveConsistency((Statement<?>) request).isDcLocal();
588+
}
589+
return true;
590+
}
591+
592+
@NonNull
593+
private ConsistencyLevel getEffectiveConsistency(@NonNull Statement<?> statement) {
594+
ConsistencyLevel consistency = statement.getConsistencyLevel();
595+
if (consistency != null) {
596+
return consistency;
597+
}
598+
599+
return getRequestProfile(statement)
600+
.map(
601+
requestProfile ->
602+
context
603+
.getConsistencyLevelRegistry()
604+
.nameToLevel(requestProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY)))
605+
.orElse(defaultConsistencyLevel);
606+
}
607+
556608
private QueryPlan buildRemoteQueryPlanAll() {
557609

558610
return new LazyQueryPlan() {

0 commit comments

Comments
 (0)