Skip to content

Commit 302b72c

Browse files
committed
Start consider SERIAL SELECT as LWT queries
Driver did not consider SERIAL SELECT as LWT and therefore routed them as regular queries causing LWT congestion. Fix is to consider consistency when RequestRoutingMethod is calculated.
1 parent bd9b26a commit 302b72c

4 files changed

Lines changed: 292 additions & 6 deletions

File tree

core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,20 @@ public static DefaultPreparedStatement toPreparedStatement(
401401
}
402402
}
403403

404+
ConsistencyLevel consistencyLevel = request.getConsistencyLevelForBoundStatements();
405+
if (consistencyLevel == null) {
406+
consistencyLevel =
407+
context
408+
.getConsistencyLevelRegistry()
409+
.nameToLevel(executionProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY));
410+
}
411+
412+
RequestRoutingType requestRoutingType = RequestRoutingType.REGULAR;
413+
if ((lwtInfo != null && lwtInfo.isLwt(response.variablesMetadata.flags))
414+
|| consistencyLevel.isSerial()) {
415+
requestRoutingType = RequestRoutingType.LWT;
416+
}
417+
404418
return new DefaultPreparedStatement(
405419
ByteBuffer.wrap(response.preparedQueryId).asReadOnlyBuffer(),
406420
request.getQuery(),
@@ -428,9 +442,7 @@ public static DefaultPreparedStatement toPreparedStatement(
428442
request.areBoundStatementsTracing(),
429443
context.getCodecRegistry(),
430444
context.getProtocolVersion(),
431-
lwtInfo != null && lwtInfo.isLwt(response.variablesMetadata.flags)
432-
? RequestRoutingType.LWT
433-
: RequestRoutingType.REGULAR);
445+
requestRoutingType);
434446
}
435447

436448
public static ColumnDefinitions toColumnDefinitions(

core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestRoutingTest.java

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,33 @@
2525

2626
import static org.assertj.core.api.Assertions.assertThat;
2727
import static org.mockito.BDDMockito.given;
28+
import static org.mockito.Mockito.mock;
2829
import static org.mockito.Mockito.when;
2930

3031
import com.datastax.oss.driver.api.core.CqlIdentifier;
32+
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
3133
import com.datastax.oss.driver.api.core.RequestRoutingType;
3234
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
3335
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
36+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
37+
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
38+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
39+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
40+
import com.datastax.oss.driver.api.core.cql.Statement;
3441
import com.datastax.oss.driver.api.core.metadata.Metadata;
3542
import com.datastax.oss.driver.api.core.metadata.Node;
3643
import com.datastax.oss.driver.api.core.metadata.TokenMap;
3744
import com.datastax.oss.driver.api.core.metadata.token.Token;
3845
import com.datastax.oss.driver.api.core.session.Request;
46+
import com.datastax.oss.driver.internal.core.cql.DefaultBoundStatement;
3947
import com.datastax.oss.driver.internal.core.loadbalancing.BasicLoadBalancingPolicy.RequestRoutingMethod;
4048
import com.datastax.oss.driver.internal.core.session.DefaultSession;
4149
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
4250
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
4351
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
4452
import com.datastax.oss.protocol.internal.util.Bytes;
4553
import java.nio.ByteBuffer;
54+
import java.util.Collections;
4655
import java.util.Optional;
4756
import java.util.Queue;
4857
import java.util.UUID;
@@ -165,6 +174,135 @@ public void should_dispatch_to_regular_query_plan_when_request_is_regular() {
165174
assertThat(plan2).containsExactlyInAnyOrder(node1, node2, node3);
166175
}
167176

177+
@Test
178+
public void
179+
should_dispatch_to_preserve_query_plan_when_simple_local_serial_select_and_config_preserve() {
180+
// Given
181+
initPolicy("PRESERVE_REPLICA_ORDER");
182+
SimpleStatement statement =
183+
SimpleStatement.builder(
184+
"SELECT * FROM unique_key_value "
185+
+ "WHERE unique_key=? AND unique_value=? AND context=?")
186+
.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL)
187+
.setRoutingKeyspace(KEYSPACE)
188+
.setRoutingKey(ROUTING_KEY)
189+
.build();
190+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
191+
.willReturn(ImmutableList.of(node1, node2));
192+
193+
// When
194+
Queue<Node> plan = policy.newQueryPlan(statement, session);
195+
196+
// Then
197+
assertThat(statement.isLWT()).isFalse();
198+
assertThat(statement.getRequestRoutingType()).isNull();
199+
assertThat(policy.getRequestRoutingMethod(statement))
200+
.isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER);
201+
assertThat(plan).containsExactly(node1, node2, node3);
202+
}
203+
204+
@Test
205+
public void
206+
should_dispatch_to_preserve_query_plan_when_prepared_local_serial_select_and_config_preserve() {
207+
// Given
208+
initPolicy("PRESERVE_REPLICA_ORDER");
209+
BoundStatement statement = newRegularBoundStatementWithLocalSerialConsistency();
210+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
211+
.willReturn(ImmutableList.of(node1, node2));
212+
213+
// When
214+
Queue<Node> plan = policy.newQueryPlan(statement, session);
215+
216+
// Then
217+
assertThat(statement.isLWT()).isFalse();
218+
assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.REGULAR);
219+
assertThat(policy.getRequestRoutingMethod(statement))
220+
.isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER);
221+
assertThat(plan).containsExactly(node1, node2, node3);
222+
}
223+
224+
@Test
225+
public void should_dispatch_to_regular_query_plan_when_local_serial_select_and_config_regular() {
226+
// Given
227+
initPolicy("REGULAR");
228+
SimpleStatement statement =
229+
SimpleStatement.builder(
230+
"SELECT * FROM unique_key_value "
231+
+ "WHERE unique_key=? AND unique_value=? AND context=?")
232+
.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL)
233+
.setRoutingKeyspace(KEYSPACE)
234+
.setRoutingKey(ROUTING_KEY)
235+
.build();
236+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
237+
.willReturn(ImmutableList.of(node1, node2));
238+
239+
// When
240+
Queue<Node> plan = policy.newQueryPlan(statement, session);
241+
242+
// Then
243+
assertThat(statement.isLWT()).isFalse();
244+
assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR);
245+
assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3);
246+
}
247+
248+
@Test
249+
public void should_dispatch_to_preserve_query_plan_when_profile_has_local_serial_consistency() {
250+
// Given
251+
initPolicy("PRESERVE_REPLICA_ORDER");
252+
DriverExecutionProfile serialProfile = mock(DriverExecutionProfile.class);
253+
when(serialProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY))
254+
.thenReturn("LOCAL_SERIAL");
255+
SimpleStatement statement =
256+
SimpleStatement.builder(
257+
"SELECT * FROM unique_key_value "
258+
+ "WHERE unique_key=? AND unique_value=? AND context=?")
259+
.setExecutionProfile(serialProfile)
260+
.setRoutingKeyspace(KEYSPACE)
261+
.setRoutingKey(ROUTING_KEY)
262+
.build();
263+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
264+
.willReturn(ImmutableList.of(node1, node2));
265+
266+
// When
267+
Queue<Node> plan = policy.newQueryPlan(statement, session);
268+
269+
// Then
270+
assertThat(statement.getConsistencyLevel()).isNull();
271+
assertThat(policy.getRequestRoutingMethod(statement))
272+
.isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER);
273+
assertThat(plan).containsExactly(node1, node2, node3);
274+
}
275+
276+
@Test
277+
public void
278+
should_dispatch_to_preserve_query_plan_when_profile_name_has_local_serial_consistency() {
279+
// Given
280+
initPolicy("PRESERVE_REPLICA_ORDER");
281+
DriverExecutionProfile serialProfile = mock(DriverExecutionProfile.class);
282+
when(config.getProfile("serial")).thenReturn(serialProfile);
283+
when(serialProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY))
284+
.thenReturn("LOCAL_SERIAL");
285+
SimpleStatement statement =
286+
SimpleStatement.builder(
287+
"SELECT * FROM unique_key_value "
288+
+ "WHERE unique_key=? AND unique_value=? AND context=?")
289+
.setExecutionProfileName("serial")
290+
.setRoutingKeyspace(KEYSPACE)
291+
.setRoutingKey(ROUTING_KEY)
292+
.build();
293+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
294+
.willReturn(ImmutableList.of(node1, node2));
295+
296+
// When
297+
Queue<Node> plan = policy.newQueryPlan(statement, session);
298+
299+
// Then
300+
assertThat(statement.getConsistencyLevel()).isNull();
301+
assertThat(policy.getRequestRoutingMethod(statement))
302+
.isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER);
303+
assertThat(plan).containsExactly(node1, node2, node3);
304+
}
305+
168306
@Test
169307
public void should_dispatch_to_preserve_query_plan_when_lwt_and_config_preserve() {
170308
// Given
@@ -186,6 +324,37 @@ public void should_dispatch_to_preserve_query_plan_when_lwt_and_config_preserve(
186324
assertThat(plan3).containsExactly(node2, node1, node3);
187325
}
188326

327+
private BoundStatement newRegularBoundStatementWithLocalSerialConsistency() {
328+
PreparedStatement preparedStatement = mock(PreparedStatement.class);
329+
ColumnDefinitions variableDefinitions = mock(ColumnDefinitions.class);
330+
when(preparedStatement.isLWT()).thenReturn(false);
331+
when(preparedStatement.getRequestRoutingType()).thenReturn(RequestRoutingType.REGULAR);
332+
when(preparedStatement.getVariableDefinitions()).thenReturn(variableDefinitions);
333+
return new DefaultBoundStatement(
334+
preparedStatement,
335+
variableDefinitions,
336+
new ByteBuffer[0],
337+
null,
338+
null,
339+
KEYSPACE,
340+
ROUTING_KEY,
341+
null,
342+
Collections.emptyMap(),
343+
null,
344+
false,
345+
Statement.NO_DEFAULT_TIMESTAMP,
346+
null,
347+
Integer.MIN_VALUE,
348+
DefaultConsistencyLevel.LOCAL_SERIAL,
349+
null,
350+
null,
351+
null,
352+
null,
353+
null,
354+
Statement.NO_NOW_IN_SECONDS,
355+
null);
356+
}
357+
189358
@Test
190359
public void should_dispatch_to_regular_query_plan_when_lwt_but_config_regular() {
191360
// Given

integration-tests/src/test/java/com/datastax/oss/driver/core/loadbalancing/LWTLoadBalancingMultiDcIT.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,22 @@
2323

2424
import static org.assertj.core.api.Assertions.assertThat;
2525
import static org.awaitility.Awaitility.await;
26+
import static org.junit.Assume.assumeTrue;
2627

2728
import com.datastax.oss.driver.api.core.CqlIdentifier;
2829
import com.datastax.oss.driver.api.core.CqlSession;
30+
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
2931
import com.datastax.oss.driver.api.core.ProtocolVersion;
3032
import com.datastax.oss.driver.api.core.RequestRoutingType;
3133
import com.datastax.oss.driver.api.core.Version;
3234
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
3335
import com.datastax.oss.driver.api.core.cql.BatchStatement;
3436
import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
3537
import com.datastax.oss.driver.api.core.cql.BatchType;
38+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
3639
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
3740
import com.datastax.oss.driver.api.core.cql.ResultSet;
41+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
3842
import com.datastax.oss.driver.api.core.metadata.Node;
3943
import com.datastax.oss.driver.api.core.metadata.TokenMap;
4044
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
@@ -57,6 +61,7 @@
5761
public class LWTLoadBalancingMultiDcIT {
5862
private static final String LOCAL_DC = "dc1";
5963
private static final String KEYSPACE = "test";
64+
private static final String LOCAL_SERIAL_PROFILE = "local-serial";
6065

6166
private static final CustomCcmRule CCM_RULE =
6267
CustomCcmRule.builder().withNodes(2, 1).build(); // 2 nodes in DC1, 1 node in DC2
@@ -67,7 +72,12 @@ public class LWTLoadBalancingMultiDcIT {
6772
.withConfigLoader(
6873
SessionUtils.configLoaderBuilder()
6974
.withString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, LOCAL_DC)
75+
.withString(
76+
DefaultDriverOption.LOAD_BALANCING_DEFAULT_LWT_REQUEST_ROUTING_METHOD,
77+
"PRESERVE_REPLICA_ORDER")
7078
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(30))
79+
.startProfile(LOCAL_SERIAL_PROFILE)
80+
.withString(DefaultDriverOption.REQUEST_CONSISTENCY, "LOCAL_SERIAL")
7181
.build())
7282
.build();
7383

@@ -76,6 +86,7 @@ public class LWTLoadBalancingMultiDcIT {
7686

7787
public static final int FIRST_TEST_PARTITION_KEY = 4242;
7888
public static final int SECOND_TEST_PARTITION_KEY = 4343;
89+
public static final int THIRD_TEST_PARTITION_KEY = 4444;
7990
public static final int NUM_TEST_ITERATIONS = 30;
8091

8192
@BeforeClass
@@ -207,4 +218,63 @@ public void should_route_lwt_batch_to_local_dc_replicas() {
207218
assertThat(coordinators).isSubsetOf(localReplicas);
208219
assertThat(coordinatorDcs).containsOnly(LOCAL_DC);
209220
}
221+
222+
@Test
223+
public void should_mark_prepared_local_serial_simple_select_as_lwt() {
224+
assumeTrue(CcmBridge.isDistributionOf(BackendType.SCYLLA));
225+
226+
CqlSession session = SESSION_RULE.session();
227+
SimpleStatement simpleSelect =
228+
SimpleStatement.builder("SELECT * FROM test.foo WHERE pk = ? AND ck = ?")
229+
.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL)
230+
.build();
231+
PreparedStatement select = session.prepare(simpleSelect);
232+
BoundStatement statement = select.bind(THIRD_TEST_PARTITION_KEY, 0);
233+
234+
assertThat(simpleSelect.isLWT()).isFalse();
235+
assertThat(simpleSelect.getRequestRoutingType()).isNull();
236+
assertThat(simpleSelect.getConsistencyLevel()).isEqualTo(DefaultConsistencyLevel.LOCAL_SERIAL);
237+
238+
assertThat(select.isLWT()).isTrue();
239+
assertThat(select.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT);
240+
241+
assertThat(statement.isLWT()).isTrue();
242+
assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT);
243+
assertThat(statement.getRoutingKeyspace()).isEqualTo(CqlIdentifier.fromCql(KEYSPACE));
244+
assertThat(statement.getRoutingKey()).isNotNull();
245+
assertThat(statement.getConsistencyLevel()).isEqualTo(DefaultConsistencyLevel.LOCAL_SERIAL);
246+
247+
ResultSet result = session.execute(statement);
248+
assertThat(result.getExecutionInfo().getCoordinator()).isNotNull();
249+
}
250+
251+
@Test
252+
public void should_mark_prepared_profiled_local_serial_simple_select_as_lwt() {
253+
assumeTrue(CcmBridge.isDistributionOf(BackendType.SCYLLA));
254+
255+
CqlSession session = SESSION_RULE.session();
256+
SimpleStatement simpleSelect =
257+
SimpleStatement.builder("SELECT * FROM test.foo WHERE pk = ? AND ck = ?")
258+
.setExecutionProfileName(LOCAL_SERIAL_PROFILE)
259+
.build();
260+
PreparedStatement select = session.prepare(simpleSelect);
261+
BoundStatement statement = select.bind(THIRD_TEST_PARTITION_KEY, 0);
262+
263+
assertThat(simpleSelect.isLWT()).isFalse();
264+
assertThat(simpleSelect.getRequestRoutingType()).isNull();
265+
assertThat(simpleSelect.getConsistencyLevel()).isNull();
266+
267+
assertThat(select.isLWT()).isTrue();
268+
assertThat(select.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT);
269+
270+
assertThat(statement.isLWT()).isTrue();
271+
assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT);
272+
assertThat(statement.getExecutionProfileName()).isEqualTo(LOCAL_SERIAL_PROFILE);
273+
assertThat(statement.getRoutingKeyspace()).isEqualTo(CqlIdentifier.fromCql(KEYSPACE));
274+
assertThat(statement.getRoutingKey()).isNotNull();
275+
assertThat(statement.getConsistencyLevel()).isNull();
276+
277+
ResultSet result = session.execute(statement);
278+
assertThat(result.getExecutionInfo().getCoordinator()).isNotNull();
279+
}
210280
}

0 commit comments

Comments
 (0)