Skip to content

Commit a4f78b4

Browse files
committed
Start consider SERIAL SELECT as LWT queries
Driver did not consider SERIAL SELECT as LWT and therefore routed them as regular queries causing LWT congestion. Fix is to consider consistency when RequestRoutingMethod is calculated.
1 parent bd9b26a commit a4f78b4

7 files changed

Lines changed: 323 additions & 14 deletions

File tree

core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ public static DefaultPreparedStatement toPreparedStatement(
430430
context.getProtocolVersion(),
431431
lwtInfo != null && lwtInfo.isLwt(response.variablesMetadata.flags)
432432
? RequestRoutingType.LWT
433-
: RequestRoutingType.REGULAR);
433+
: null);
434434
}
435435

436436
public static ColumnDefinitions toColumnDefinitions(

core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import java.util.Arrays;
4545
import java.util.List;
4646
import java.util.Map;
47-
import java.util.Objects;
4847
import net.jcip.annotations.Immutable;
4948

5049
@Immutable
@@ -805,9 +804,13 @@ public BoundStatement setNowInSeconds(int newNowInSeconds) {
805804
@Nullable
806805
@Override
807806
public RequestRoutingType getRequestRoutingType() {
808-
return Objects.nonNull(requestRoutingType)
809-
? requestRoutingType
810-
: preparedStatement.getRequestRoutingType();
807+
if (requestRoutingType != null) {
808+
return requestRoutingType;
809+
}
810+
if (consistencyLevel != null && consistencyLevel.isSerial()) {
811+
return RequestRoutingType.LWT;
812+
}
813+
return preparedStatement.getRequestRoutingType();
811814
}
812815

813816
@NonNull

core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,15 @@ public boolean isLWT() {
196196
@Nullable
197197
@Override
198198
public RequestRoutingType getRequestRoutingType() {
199-
return requestRoutingType;
199+
if (requestRoutingType != null) {
200+
return requestRoutingType;
201+
}
202+
203+
if (consistencyLevelForBoundStatements != null
204+
&& consistencyLevelForBoundStatements.isSerial()) {
205+
return RequestRoutingType.LWT;
206+
}
207+
return null;
200208
}
201209

202210
@Override

core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultSimpleStatement.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -776,7 +776,13 @@ public SimpleStatement setNowInSeconds(int newNowInSeconds) {
776776
@Nullable
777777
@Override
778778
public RequestRoutingType getRequestRoutingType() {
779-
return requestRoutingType;
779+
if (requestRoutingType != null) {
780+
return requestRoutingType;
781+
}
782+
if (serialConsistencyLevel != null && serialConsistencyLevel.isSerial()) {
783+
return RequestRoutingType.LWT;
784+
}
785+
return null;
780786
}
781787

782788
@NonNull

core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestRoutingTest.java

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,33 @@
2525

2626
import static org.assertj.core.api.Assertions.assertThat;
2727
import static org.mockito.BDDMockito.given;
28+
import static org.mockito.Mockito.mock;
2829
import static org.mockito.Mockito.when;
2930

3031
import com.datastax.oss.driver.api.core.CqlIdentifier;
32+
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
3133
import com.datastax.oss.driver.api.core.RequestRoutingType;
3234
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
3335
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
36+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
37+
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
38+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
39+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
40+
import com.datastax.oss.driver.api.core.cql.Statement;
3441
import com.datastax.oss.driver.api.core.metadata.Metadata;
3542
import com.datastax.oss.driver.api.core.metadata.Node;
3643
import com.datastax.oss.driver.api.core.metadata.TokenMap;
3744
import com.datastax.oss.driver.api.core.metadata.token.Token;
3845
import com.datastax.oss.driver.api.core.session.Request;
46+
import com.datastax.oss.driver.internal.core.cql.DefaultBoundStatement;
3947
import com.datastax.oss.driver.internal.core.loadbalancing.BasicLoadBalancingPolicy.RequestRoutingMethod;
4048
import com.datastax.oss.driver.internal.core.session.DefaultSession;
4149
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
4250
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
4351
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
4452
import com.datastax.oss.protocol.internal.util.Bytes;
4553
import java.nio.ByteBuffer;
54+
import java.util.Collections;
4655
import java.util.Optional;
4756
import java.util.Queue;
4857
import java.util.UUID;
@@ -165,6 +174,132 @@ public void should_dispatch_to_regular_query_plan_when_request_is_regular() {
165174
assertThat(plan2).containsExactlyInAnyOrder(node1, node2, node3);
166175
}
167176

177+
@Test
178+
public void
179+
should_dispatch_to_regular_query_plan_when_simple_local_serial_select_and_config_preserve() {
180+
// Given
181+
initPolicy("PRESERVE_REPLICA_ORDER");
182+
SimpleStatement statement =
183+
SimpleStatement.builder(
184+
"SELECT * FROM unique_key_value "
185+
+ "WHERE unique_key=? AND unique_value=? AND context=?")
186+
.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL)
187+
.setRoutingKeyspace(KEYSPACE)
188+
.setRoutingKey(ROUTING_KEY)
189+
.build();
190+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
191+
.willReturn(ImmutableList.of(node1, node2));
192+
193+
// When
194+
Queue<Node> plan = policy.newQueryPlan(statement, session);
195+
196+
// Then
197+
assertThat(statement.isLWT()).isFalse();
198+
assertThat(statement.getRequestRoutingType()).isNull();
199+
assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR);
200+
assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3);
201+
}
202+
203+
@Test
204+
public void
205+
should_dispatch_to_regular_query_plan_when_prepared_statement_routing_type_is_regular() {
206+
// Given
207+
initPolicy("PRESERVE_REPLICA_ORDER");
208+
BoundStatement statement = newRegularBoundStatementWithLocalSerialConsistency();
209+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
210+
.willReturn(ImmutableList.of(node1, node2));
211+
212+
// When
213+
Queue<Node> plan = policy.newQueryPlan(statement, session);
214+
215+
// Then
216+
assertThat(statement.isLWT()).isFalse();
217+
assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT);
218+
assertThat(policy.getRequestRoutingMethod(statement))
219+
.isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER);
220+
assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3);
221+
}
222+
223+
@Test
224+
public void should_dispatch_to_regular_query_plan_when_local_serial_select_and_config_regular() {
225+
// Given
226+
initPolicy("REGULAR");
227+
SimpleStatement statement =
228+
SimpleStatement.builder(
229+
"SELECT * FROM unique_key_value "
230+
+ "WHERE unique_key=? AND unique_value=? AND context=?")
231+
.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL)
232+
.setRoutingKeyspace(KEYSPACE)
233+
.setRoutingKey(ROUTING_KEY)
234+
.build();
235+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
236+
.willReturn(ImmutableList.of(node1, node2));
237+
238+
// When
239+
Queue<Node> plan = policy.newQueryPlan(statement, session);
240+
241+
// Then
242+
assertThat(statement.isLWT()).isFalse();
243+
assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR);
244+
assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3);
245+
}
246+
247+
@Test
248+
public void should_dispatch_to_regular_query_plan_when_profile_has_local_serial_consistency() {
249+
// Given
250+
initPolicy("PRESERVE_REPLICA_ORDER");
251+
DriverExecutionProfile serialProfile = mock(DriverExecutionProfile.class);
252+
when(serialProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY))
253+
.thenReturn("LOCAL_SERIAL");
254+
SimpleStatement statement =
255+
SimpleStatement.builder(
256+
"SELECT * FROM unique_key_value "
257+
+ "WHERE unique_key=? AND unique_value=? AND context=?")
258+
.setExecutionProfile(serialProfile)
259+
.setRoutingKeyspace(KEYSPACE)
260+
.setRoutingKey(ROUTING_KEY)
261+
.build();
262+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
263+
.willReturn(ImmutableList.of(node1, node2));
264+
265+
// When
266+
Queue<Node> plan = policy.newQueryPlan(statement, session);
267+
268+
// Then
269+
assertThat(statement.getConsistencyLevel()).isNull();
270+
assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR);
271+
assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3);
272+
}
273+
274+
@Test
275+
public void
276+
should_dispatch_to_regular_query_plan_when_profile_name_has_local_serial_consistency() {
277+
// Given
278+
initPolicy("PRESERVE_REPLICA_ORDER");
279+
DriverExecutionProfile serialProfile = mock(DriverExecutionProfile.class);
280+
when(config.getProfile("serial")).thenReturn(serialProfile);
281+
when(serialProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY))
282+
.thenReturn("LOCAL_SERIAL");
283+
SimpleStatement statement =
284+
SimpleStatement.builder(
285+
"SELECT * FROM unique_key_value "
286+
+ "WHERE unique_key=? AND unique_value=? AND context=?")
287+
.setExecutionProfileName("serial")
288+
.setRoutingKeyspace(KEYSPACE)
289+
.setRoutingKey(ROUTING_KEY)
290+
.build();
291+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
292+
.willReturn(ImmutableList.of(node1, node2));
293+
294+
// When
295+
Queue<Node> plan = policy.newQueryPlan(statement, session);
296+
297+
// Then
298+
assertThat(statement.getConsistencyLevel()).isNull();
299+
assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR);
300+
assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3);
301+
}
302+
168303
@Test
169304
public void should_dispatch_to_preserve_query_plan_when_lwt_and_config_preserve() {
170305
// Given
@@ -186,6 +321,37 @@ public void should_dispatch_to_preserve_query_plan_when_lwt_and_config_preserve(
186321
assertThat(plan3).containsExactly(node2, node1, node3);
187322
}
188323

324+
private BoundStatement newRegularBoundStatementWithLocalSerialConsistency() {
325+
PreparedStatement preparedStatement = mock(PreparedStatement.class);
326+
ColumnDefinitions variableDefinitions = mock(ColumnDefinitions.class);
327+
when(preparedStatement.isLWT()).thenReturn(false);
328+
when(preparedStatement.getRequestRoutingType()).thenReturn(RequestRoutingType.REGULAR);
329+
when(preparedStatement.getVariableDefinitions()).thenReturn(variableDefinitions);
330+
return new DefaultBoundStatement(
331+
preparedStatement,
332+
variableDefinitions,
333+
new ByteBuffer[0],
334+
null,
335+
null,
336+
KEYSPACE,
337+
ROUTING_KEY,
338+
null,
339+
Collections.emptyMap(),
340+
null,
341+
false,
342+
Statement.NO_DEFAULT_TIMESTAMP,
343+
null,
344+
Integer.MIN_VALUE,
345+
DefaultConsistencyLevel.LOCAL_SERIAL,
346+
null,
347+
null,
348+
null,
349+
null,
350+
null,
351+
Statement.NO_NOW_IN_SECONDS,
352+
null);
353+
}
354+
189355
@Test
190356
public void should_dispatch_to_regular_query_plan_when_lwt_but_config_regular() {
191357
// Given

integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/LWTLoadBalancingMultiDcIT.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,22 @@
2323

2424
import static org.assertj.core.api.Assertions.assertThat;
2525
import static org.awaitility.Awaitility.await;
26+
import static org.junit.Assume.assumeTrue;
2627

2728
import com.datastax.oss.driver.api.core.CqlIdentifier;
2829
import com.datastax.oss.driver.api.core.CqlSession;
30+
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
2931
import com.datastax.oss.driver.api.core.ProtocolVersion;
3032
import com.datastax.oss.driver.api.core.RequestRoutingType;
3133
import com.datastax.oss.driver.api.core.Version;
3234
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
3335
import com.datastax.oss.driver.api.core.cql.BatchStatement;
3436
import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
3537
import com.datastax.oss.driver.api.core.cql.BatchType;
38+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
3639
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
3740
import com.datastax.oss.driver.api.core.cql.ResultSet;
41+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
3842
import com.datastax.oss.driver.api.core.metadata.Node;
3943
import com.datastax.oss.driver.api.core.metadata.TokenMap;
4044
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
@@ -57,6 +61,7 @@
5761
public class LWTLoadBalancingMultiDcIT {
5862
private static final String LOCAL_DC = "dc1";
5963
private static final String KEYSPACE = "test";
64+
private static final String LOCAL_SERIAL_PROFILE = "local-serial";
6065

6166
private static final CustomCcmRule CCM_RULE =
6267
CustomCcmRule.builder().withNodes(2, 1).build(); // 2 nodes in DC1, 1 node in DC2
@@ -67,7 +72,12 @@ public class LWTLoadBalancingMultiDcIT {
6772
.withConfigLoader(
6873
SessionUtils.configLoaderBuilder()
6974
.withString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, LOCAL_DC)
75+
.withString(
76+
DefaultDriverOption.LOAD_BALANCING_DEFAULT_LWT_REQUEST_ROUTING_METHOD,
77+
"PRESERVE_REPLICA_ORDER")
7078
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(30))
79+
.startProfile(LOCAL_SERIAL_PROFILE)
80+
.withString(DefaultDriverOption.REQUEST_CONSISTENCY, "LOCAL_SERIAL")
7181
.build())
7282
.build();
7383

@@ -76,6 +86,7 @@ public class LWTLoadBalancingMultiDcIT {
7686

7787
public static final int FIRST_TEST_PARTITION_KEY = 4242;
7888
public static final int SECOND_TEST_PARTITION_KEY = 4343;
89+
public static final int THIRD_TEST_PARTITION_KEY = 4444;
7990
public static final int NUM_TEST_ITERATIONS = 30;
8091

8192
@BeforeClass
@@ -207,4 +218,63 @@ public void should_route_lwt_batch_to_local_dc_replicas() {
207218
assertThat(coordinators).isSubsetOf(localReplicas);
208219
assertThat(coordinatorDcs).containsOnly(LOCAL_DC);
209220
}
221+
222+
@Test
223+
public void should_mark_prepared_local_serial_simple_select_as_lwt() {
224+
assumeTrue(CcmBridge.isDistributionOf(BackendType.SCYLLA));
225+
226+
CqlSession session = SESSION_RULE.session();
227+
SimpleStatement simpleSelect =
228+
SimpleStatement.builder("SELECT * FROM test.foo WHERE pk = ? AND ck = ?")
229+
.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL)
230+
.build();
231+
PreparedStatement select = session.prepare(simpleSelect);
232+
BoundStatement statement = select.bind(THIRD_TEST_PARTITION_KEY, 0);
233+
234+
assertThat(simpleSelect.isLWT()).isFalse();
235+
assertThat(simpleSelect.getRequestRoutingType()).isNull();
236+
assertThat(simpleSelect.getConsistencyLevel()).isEqualTo(DefaultConsistencyLevel.LOCAL_SERIAL);
237+
238+
assertThat(select.isLWT()).isTrue();
239+
assertThat(select.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT);
240+
241+
assertThat(statement.isLWT()).isTrue();
242+
assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT);
243+
assertThat(statement.getRoutingKeyspace()).isEqualTo(CqlIdentifier.fromCql(KEYSPACE));
244+
assertThat(statement.getRoutingKey()).isNotNull();
245+
assertThat(statement.getConsistencyLevel()).isEqualTo(DefaultConsistencyLevel.LOCAL_SERIAL);
246+
247+
ResultSet result = session.execute(statement);
248+
assertThat(result.getExecutionInfo().getCoordinator()).isNotNull();
249+
}
250+
251+
@Test
252+
public void should_mark_prepared_profiled_local_serial_simple_select_as_lwt() {
253+
assumeTrue(CcmBridge.isDistributionOf(BackendType.SCYLLA));
254+
255+
CqlSession session = SESSION_RULE.session();
256+
SimpleStatement simpleSelect =
257+
SimpleStatement.builder("SELECT * FROM test.foo WHERE pk = ? AND ck = ?")
258+
.setExecutionProfileName(LOCAL_SERIAL_PROFILE)
259+
.build();
260+
PreparedStatement select = session.prepare(simpleSelect);
261+
BoundStatement statement = select.bind(THIRD_TEST_PARTITION_KEY, 0);
262+
263+
assertThat(simpleSelect.isLWT()).isFalse();
264+
assertThat(simpleSelect.getRequestRoutingType()).isNull();
265+
assertThat(simpleSelect.getConsistencyLevel()).isNull();
266+
267+
assertThat(select.isLWT()).isTrue();
268+
assertThat(select.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT);
269+
270+
assertThat(statement.isLWT()).isTrue();
271+
assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT);
272+
assertThat(statement.getExecutionProfileName()).isEqualTo(LOCAL_SERIAL_PROFILE);
273+
assertThat(statement.getRoutingKeyspace()).isEqualTo(CqlIdentifier.fromCql(KEYSPACE));
274+
assertThat(statement.getRoutingKey()).isNotNull();
275+
assertThat(statement.getConsistencyLevel()).isNull();
276+
277+
ResultSet result = session.execute(statement);
278+
assertThat(result.getExecutionInfo().getCoordinator()).isNotNull();
279+
}
210280
}

0 commit comments

Comments
 (0)