Skip to content

Commit 035c3ef

Browse files
committed
Start consider SERIAL SELECT as LWT queries
Driver did not consider SERIAL SELECT as LWT and therefore routed them as regular queries causing LWT congestion. Fix is to consider consistency when RequestRoutingMethod is calculated.
1 parent bd9b26a commit 035c3ef

7 files changed

Lines changed: 319 additions & 15 deletions

File tree

core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,20 @@ public static DefaultPreparedStatement toPreparedStatement(
401401
}
402402
}
403403

404+
ConsistencyLevel consistencyLevel = request.getConsistencyLevelForBoundStatements();
405+
if (consistencyLevel == null) {
406+
consistencyLevel =
407+
context
408+
.getConsistencyLevelRegistry()
409+
.nameToLevel(executionProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY));
410+
}
411+
412+
RequestRoutingType requestRoutingType = RequestRoutingType.REGULAR;
413+
if ((lwtInfo != null && lwtInfo.isLwt(response.variablesMetadata.flags))
414+
|| (consistencyLevel != null && consistencyLevel.isSerial())) {
415+
requestRoutingType = RequestRoutingType.LWT;
416+
}
417+
404418
return new DefaultPreparedStatement(
405419
ByteBuffer.wrap(response.preparedQueryId).asReadOnlyBuffer(),
406420
request.getQuery(),
@@ -428,9 +442,7 @@ public static DefaultPreparedStatement toPreparedStatement(
428442
request.areBoundStatementsTracing(),
429443
context.getCodecRegistry(),
430444
context.getProtocolVersion(),
431-
lwtInfo != null && lwtInfo.isLwt(response.variablesMetadata.flags)
432-
? RequestRoutingType.LWT
433-
: RequestRoutingType.REGULAR);
445+
requestRoutingType);
434446
}
435447

436448
public static ColumnDefinitions toColumnDefinitions(

core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import java.util.Arrays;
4545
import java.util.List;
4646
import java.util.Map;
47-
import java.util.Objects;
4847
import net.jcip.annotations.Immutable;
4948

5049
@Immutable
@@ -805,9 +804,13 @@ public BoundStatement setNowInSeconds(int newNowInSeconds) {
805804
@Nullable
806805
@Override
807806
public RequestRoutingType getRequestRoutingType() {
808-
return Objects.nonNull(requestRoutingType)
809-
? requestRoutingType
810-
: preparedStatement.getRequestRoutingType();
807+
if (requestRoutingType != null) {
808+
return requestRoutingType;
809+
}
810+
if (consistencyLevel != null && consistencyLevel.isSerial()) {
811+
return RequestRoutingType.LWT;
812+
}
813+
return preparedStatement.getRequestRoutingType();
811814
}
812815

813816
@NonNull

core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultPreparedStatement.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,11 @@ public boolean isLWT() {
196196
@Nullable
197197
@Override
198198
public RequestRoutingType getRequestRoutingType() {
199+
if (requestRoutingType == null
200+
&& consistencyLevelForBoundStatements != null
201+
&& consistencyLevelForBoundStatements.isSerial()) {
202+
return RequestRoutingType.LWT;
203+
}
199204
return requestRoutingType;
200205
}
201206

core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultSimpleStatement.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -776,7 +776,13 @@ public SimpleStatement setNowInSeconds(int newNowInSeconds) {
776776
@Nullable
777777
@Override
778778
public RequestRoutingType getRequestRoutingType() {
779-
return requestRoutingType;
779+
if (requestRoutingType != null) {
780+
return requestRoutingType;
781+
}
782+
if (serialConsistencyLevel != null && serialConsistencyLevel.isSerial()) {
783+
return RequestRoutingType.LWT;
784+
}
785+
return null;
780786
}
781787

782788
@NonNull

core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestRoutingTest.java

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,33 @@
2525

2626
import static org.assertj.core.api.Assertions.assertThat;
2727
import static org.mockito.BDDMockito.given;
28+
import static org.mockito.Mockito.mock;
2829
import static org.mockito.Mockito.when;
2930

3031
import com.datastax.oss.driver.api.core.CqlIdentifier;
32+
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
3133
import com.datastax.oss.driver.api.core.RequestRoutingType;
3234
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
3335
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
36+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
37+
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
38+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
39+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
40+
import com.datastax.oss.driver.api.core.cql.Statement;
3441
import com.datastax.oss.driver.api.core.metadata.Metadata;
3542
import com.datastax.oss.driver.api.core.metadata.Node;
3643
import com.datastax.oss.driver.api.core.metadata.TokenMap;
3744
import com.datastax.oss.driver.api.core.metadata.token.Token;
3845
import com.datastax.oss.driver.api.core.session.Request;
46+
import com.datastax.oss.driver.internal.core.cql.DefaultBoundStatement;
3947
import com.datastax.oss.driver.internal.core.loadbalancing.BasicLoadBalancingPolicy.RequestRoutingMethod;
4048
import com.datastax.oss.driver.internal.core.session.DefaultSession;
4149
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
4250
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
4351
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
4452
import com.datastax.oss.protocol.internal.util.Bytes;
4553
import java.nio.ByteBuffer;
54+
import java.util.Collections;
4655
import java.util.Optional;
4756
import java.util.Queue;
4857
import java.util.UUID;
@@ -165,6 +174,132 @@ public void should_dispatch_to_regular_query_plan_when_request_is_regular() {
165174
assertThat(plan2).containsExactlyInAnyOrder(node1, node2, node3);
166175
}
167176

177+
@Test
178+
public void
179+
should_dispatch_to_regular_query_plan_when_simple_local_serial_select_and_config_preserve() {
180+
// Given
181+
initPolicy("PRESERVE_REPLICA_ORDER");
182+
SimpleStatement statement =
183+
SimpleStatement.builder(
184+
"SELECT * FROM unique_key_value "
185+
+ "WHERE unique_key=? AND unique_value=? AND context=?")
186+
.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL)
187+
.setRoutingKeyspace(KEYSPACE)
188+
.setRoutingKey(ROUTING_KEY)
189+
.build();
190+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
191+
.willReturn(ImmutableList.of(node1, node2));
192+
193+
// When
194+
Queue<Node> plan = policy.newQueryPlan(statement, session);
195+
196+
// Then
197+
assertThat(statement.isLWT()).isFalse();
198+
assertThat(statement.getRequestRoutingType()).isNull();
199+
assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR);
200+
assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3);
201+
}
202+
203+
@Test
204+
public void
205+
should_dispatch_to_regular_query_plan_when_prepared_statement_routing_type_is_regular() {
206+
// Given
207+
initPolicy("PRESERVE_REPLICA_ORDER");
208+
BoundStatement statement = newRegularBoundStatementWithLocalSerialConsistency();
209+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
210+
.willReturn(ImmutableList.of(node1, node2));
211+
212+
// When
213+
Queue<Node> plan = policy.newQueryPlan(statement, session);
214+
215+
// Then
216+
assertThat(statement.isLWT()).isFalse();
217+
assertThat(statement.getRequestRoutingType()).isEqualTo(RequestRoutingType.LWT);
218+
assertThat(policy.getRequestRoutingMethod(statement))
219+
.isEqualTo(RequestRoutingMethod.PRESERVE_REPLICA_ORDER);
220+
assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3);
221+
}
222+
223+
@Test
224+
public void should_dispatch_to_regular_query_plan_when_local_serial_select_and_config_regular() {
225+
// Given
226+
initPolicy("REGULAR");
227+
SimpleStatement statement =
228+
SimpleStatement.builder(
229+
"SELECT * FROM unique_key_value "
230+
+ "WHERE unique_key=? AND unique_value=? AND context=?")
231+
.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_SERIAL)
232+
.setRoutingKeyspace(KEYSPACE)
233+
.setRoutingKey(ROUTING_KEY)
234+
.build();
235+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
236+
.willReturn(ImmutableList.of(node1, node2));
237+
238+
// When
239+
Queue<Node> plan = policy.newQueryPlan(statement, session);
240+
241+
// Then
242+
assertThat(statement.isLWT()).isFalse();
243+
assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR);
244+
assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3);
245+
}
246+
247+
@Test
248+
public void should_dispatch_to_regular_query_plan_when_profile_has_local_serial_consistency() {
249+
// Given
250+
initPolicy("PRESERVE_REPLICA_ORDER");
251+
DriverExecutionProfile serialProfile = mock(DriverExecutionProfile.class);
252+
when(serialProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY))
253+
.thenReturn("LOCAL_SERIAL");
254+
SimpleStatement statement =
255+
SimpleStatement.builder(
256+
"SELECT * FROM unique_key_value "
257+
+ "WHERE unique_key=? AND unique_value=? AND context=?")
258+
.setExecutionProfile(serialProfile)
259+
.setRoutingKeyspace(KEYSPACE)
260+
.setRoutingKey(ROUTING_KEY)
261+
.build();
262+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
263+
.willReturn(ImmutableList.of(node1, node2));
264+
265+
// When
266+
Queue<Node> plan = policy.newQueryPlan(statement, session);
267+
268+
// Then
269+
assertThat(statement.getConsistencyLevel()).isNull();
270+
assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR);
271+
assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3);
272+
}
273+
274+
@Test
275+
public void
276+
should_dispatch_to_regular_query_plan_when_profile_name_has_local_serial_consistency() {
277+
// Given
278+
initPolicy("PRESERVE_REPLICA_ORDER");
279+
DriverExecutionProfile serialProfile = mock(DriverExecutionProfile.class);
280+
when(config.getProfile("serial")).thenReturn(serialProfile);
281+
when(serialProfile.getString(DefaultDriverOption.REQUEST_CONSISTENCY))
282+
.thenReturn("LOCAL_SERIAL");
283+
SimpleStatement statement =
284+
SimpleStatement.builder(
285+
"SELECT * FROM unique_key_value "
286+
+ "WHERE unique_key=? AND unique_value=? AND context=?")
287+
.setExecutionProfileName("serial")
288+
.setRoutingKeyspace(KEYSPACE)
289+
.setRoutingKey(ROUTING_KEY)
290+
.build();
291+
given(tokenMap.getReplicasList(KEYSPACE, null, ROUTING_KEY))
292+
.willReturn(ImmutableList.of(node1, node2));
293+
294+
// When
295+
Queue<Node> plan = policy.newQueryPlan(statement, session);
296+
297+
// Then
298+
assertThat(statement.getConsistencyLevel()).isNull();
299+
assertThat(policy.getRequestRoutingMethod(statement)).isEqualTo(RequestRoutingMethod.REGULAR);
300+
assertThat(plan).containsExactlyInAnyOrder(node1, node2, node3);
301+
}
302+
168303
@Test
169304
public void should_dispatch_to_preserve_query_plan_when_lwt_and_config_preserve() {
170305
// Given
@@ -186,6 +321,37 @@ public void should_dispatch_to_preserve_query_plan_when_lwt_and_config_preserve(
186321
assertThat(plan3).containsExactly(node2, node1, node3);
187322
}
188323

324+
private BoundStatement newRegularBoundStatementWithLocalSerialConsistency() {
325+
PreparedStatement preparedStatement = mock(PreparedStatement.class);
326+
ColumnDefinitions variableDefinitions = mock(ColumnDefinitions.class);
327+
when(preparedStatement.isLWT()).thenReturn(false);
328+
when(preparedStatement.getRequestRoutingType()).thenReturn(RequestRoutingType.REGULAR);
329+
when(preparedStatement.getVariableDefinitions()).thenReturn(variableDefinitions);
330+
return new DefaultBoundStatement(
331+
preparedStatement,
332+
variableDefinitions,
333+
new ByteBuffer[0],
334+
null,
335+
null,
336+
KEYSPACE,
337+
ROUTING_KEY,
338+
null,
339+
Collections.emptyMap(),
340+
null,
341+
false,
342+
Statement.NO_DEFAULT_TIMESTAMP,
343+
null,
344+
Integer.MIN_VALUE,
345+
DefaultConsistencyLevel.LOCAL_SERIAL,
346+
null,
347+
null,
348+
null,
349+
null,
350+
null,
351+
Statement.NO_NOW_IN_SECONDS,
352+
null);
353+
}
354+
189355
@Test
190356
public void should_dispatch_to_regular_query_plan_when_lwt_but_config_regular() {
191357
// Given

0 commit comments

Comments
 (0)