Skip to content

Commit 93065c9

Browse files
nikagradkropachev
authored andcommitted
refactor: address dkropachev review comments on PR #891
- Remove Statement.hasSerialConsistency() (too specific for public API); inline the serial-CL detection in each policy - Revert LOCAL_SERIAL local-only filtering in PreserveReplicaOrderIterator; remote replicas now appear in the tail for both SERIAL and LOCAL_SERIAL (alignment with other drivers deferred to a separate task per reviewer) - Store ConsistencyLevel (not QueryOptions) in LatencyAwarePolicy; expose via private isEffectiveConsistencySerial() helper - Update TokenAwarePolicyTest: replace two separate LOCAL_SERIAL/SERIAL remote-replica tests with a single parameterised test covering both CLs
1 parent e9d38b1 commit 93065c9

5 files changed

Lines changed: 34 additions & 72 deletions

File tree

driver-core/src/main/java/com/datastax/driver/core/Statement.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -91,27 +91,6 @@ public boolean isLWT() {
9191
return false;
9292
}
9393

94-
/**
95-
* Returns {@code true} if this statement's effective consistency level is serial ({@link
96-
* ConsistencyLevel#SERIAL} or {@link ConsistencyLevel#LOCAL_SERIAL}). When no consistency level
97-
* has been explicitly set on this statement, falls back to the provided default.
98-
*
99-
* <p>This is used by load-balancing policies to route serial-consistency statements through the
100-
* LWT path, even when {@link #isLWT()} returns {@code false}.
101-
*
102-
* @param defaultConsistencyLevel the cluster-wide default consistency level to fall back to when
103-
* this statement has no explicit consistency level set.
104-
* @return whether the effective consistency level is serial.
105-
*/
106-
public static boolean hasSerialConsistency(
107-
Statement statement, ConsistencyLevel defaultConsistencyLevel) {
108-
ConsistencyLevel cl = statement.getConsistencyLevel();
109-
if (cl == null) {
110-
cl = defaultConsistencyLevel;
111-
}
112-
return cl != null && cl.isSerial();
113-
}
114-
11594
/**
11695
* Sets the consistency level for the query.
11796
*

driver-core/src/main/java/com/datastax/driver/core/policies/LatencyAwarePolicy.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
import com.codahale.metrics.Gauge;
1919
import com.datastax.driver.core.Cluster;
20+
import com.datastax.driver.core.ConsistencyLevel;
2021
import com.datastax.driver.core.Host;
2122
import com.datastax.driver.core.HostDistance;
2223
import com.datastax.driver.core.LatencyTracker;
2324
import com.datastax.driver.core.Metrics;
2425
import com.datastax.driver.core.MetricsUtil;
25-
import com.datastax.driver.core.QueryOptions;
2626
import com.datastax.driver.core.Statement;
2727
import com.datastax.driver.core.exceptions.BootstrappingException;
2828
import com.datastax.driver.core.exceptions.DriverException;
@@ -103,7 +103,7 @@ public class LatencyAwarePolicy implements ChainableLoadBalancingPolicy {
103103
private final long retryPeriod;
104104
private final long minMeasure;
105105
private volatile Metrics metrics;
106-
private volatile QueryOptions queryOptions;
106+
private volatile ConsistencyLevel defaultConsistencyLevel;
107107

108108
private LatencyAwarePolicy(
109109
LoadBalancingPolicy childPolicy,
@@ -220,7 +220,7 @@ public void init(Cluster cluster, Collection<Host> hosts) {
220220
}
221221
cluster.register(latencyTracker);
222222
metrics = cluster.getMetrics();
223-
queryOptions = cluster.getConfiguration().getQueryOptions();
223+
defaultConsistencyLevel = cluster.getConfiguration().getQueryOptions().getConsistencyLevel();
224224
if (metrics != null) {
225225
metrics
226226
.getRegistry()
@@ -265,9 +265,7 @@ public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
265265
// LWT routing can rely on deterministic replica ordering (e.g. by TokenAwarePolicy), and
266266
// latency-based reordering can undermine those assumptions.
267267
if (statement != null
268-
&& (statement.isLWT()
269-
|| Statement.hasSerialConsistency(
270-
statement, queryOptions != null ? queryOptions.getConsistencyLevel() : null))) {
268+
&& (statement.isLWT() || isEffectiveConsistencySerial(statement.getConsistencyLevel()))) {
271269
return childPolicy.newQueryPlan(loggedKeyspace, statement);
272270
}
273271

@@ -340,6 +338,11 @@ protected Host computeNext() {
340338
};
341339
}
342340

341+
private boolean isEffectiveConsistencySerial(ConsistencyLevel statementCl) {
342+
ConsistencyLevel cl = statementCl != null ? statementCl : defaultConsistencyLevel;
343+
return cl != null && cl.isSerial();
344+
}
345+
343346
/**
344347
* Returns a snapshot of the scores (latency averages) maintained by this policy.
345348
*

driver-core/src/main/java/com/datastax/driver/core/policies/RackAwareRoundRobinPolicy.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,12 @@ public Iterator<Host> newQueryPlan(String loggedKeyspace, final Statement statem
250250
// hosts equally
251251
ConsistencyLevel defaultCl =
252252
configuration != null ? configuration.getQueryOptions().getConsistencyLevel() : null;
253+
ConsistencyLevel effectiveCl =
254+
statement != null && statement.getConsistencyLevel() != null
255+
? statement.getConsistencyLevel()
256+
: defaultCl;
253257
final boolean isLWT =
254-
statement != null
255-
&& (statement.isLWT() || Statement.hasSerialConsistency(statement, defaultCl));
258+
statement != null && (statement.isLWT() || (effectiveCl != null && effectiveCl.isSerial()));
256259

257260
// For LWT queries, include all local DC hosts in the first part of the plan, not just those in
258261
// the local rack

driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,6 @@ private class PreserveReplicaOrderIterator extends AbstractIterator<Host> {
267267
private final List<Host> replicas;
268268
private final String keyspace;
269269
private final Statement statement;
270-
private final boolean localOnly;
271270
private List<Host> nonLocalReplicas;
272271
private Iterator<Host> nonLocalReplicasIterator;
273272
private Set<Host> returnedHosts;
@@ -278,11 +277,6 @@ public PreserveReplicaOrderIterator(String keyspace, Statement statement, List<H
278277
this.statement = statement;
279278
this.replicas = replicas;
280279
this.replicasIterator = replicas.iterator();
281-
ConsistencyLevel cl = statement.getConsistencyLevel();
282-
if (cl == null && queryOptions != null) {
283-
cl = queryOptions.getConsistencyLevel();
284-
}
285-
this.localOnly = cl == ConsistencyLevel.LOCAL_SERIAL;
286280
}
287281

288282
@Override
@@ -314,7 +308,7 @@ protected Host computeNext() {
314308
}
315309

316310
// Second pass: return remote replicas that are UP and not IGNORED
317-
if (nonLocalReplicas != null && !localOnly) {
311+
if (nonLocalReplicas != null) {
318312
if (nonLocalReplicasIterator == null) {
319313
nonLocalReplicasIterator = nonLocalReplicas.iterator();
320314
}
@@ -479,8 +473,11 @@ private QueryOptions.RequestRoutingMethod getRequestRouting(Statement statement)
479473
if (defaultLwtRequestRoutingMethod == null) {
480474
return QueryOptions.RequestRoutingMethod.REGULAR;
481475
}
482-
if (statement.isLWT()
483-
|| Statement.hasSerialConsistency(statement, queryOptions.getConsistencyLevel())) {
476+
ConsistencyLevel cl = statement.getConsistencyLevel();
477+
if (cl == null) {
478+
cl = queryOptions.getConsistencyLevel();
479+
}
480+
if (statement.isLWT() || (cl != null && cl.isSerial())) {
484481
return defaultLwtRequestRoutingMethod;
485482
}
486483
return QueryOptions.RequestRoutingMethod.REGULAR;

driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,14 @@ public Object[][] shuffleProvider() {
125125
};
126126
}
127127

128+
@DataProvider(name = "serialConsistencyProvider")
129+
public Object[][] serialConsistencyProvider() {
130+
return new Object[][] {
131+
{com.datastax.driver.core.ConsistencyLevel.SERIAL},
132+
{com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL}
133+
};
134+
}
135+
128136
@Test(groups = "unit")
129137
public void should_respect_topological_order() {
130138
// given
@@ -737,41 +745,13 @@ public void should_not_route_serial_consistency_level_option_as_lwt() {
737745
assertThat(queryPlan).containsExactly(host1, host2, host4, host3);
738746
}
739747

740-
@Test(groups = "unit")
741-
public void should_not_include_remote_replicas_for_local_serial() {
742-
// given: a LOCAL_SERIAL statement with some replicas in remote DC
743-
Statement localSerialStatement = mock(Statement.class);
744-
when(localSerialStatement.isLWT()).thenReturn(false);
745-
when(localSerialStatement.getConsistencyLevel())
746-
.thenReturn(com.datastax.driver.core.ConsistencyLevel.LOCAL_SERIAL);
747-
when(localSerialStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class)))
748-
.thenReturn(routingKey);
749-
when(localSerialStatement.getKeyspace()).thenReturn(KEYSPACE);
750-
when(metadata.getReplicasList(Metadata.quote(KEYSPACE), null, null, routingKey))
751-
.thenReturn(Lists.newArrayList(host1, host2, host3));
752-
when(childPolicy.distance(host1)).thenReturn(HostDistance.LOCAL);
753-
when(childPolicy.distance(host2)).thenReturn(HostDistance.REMOTE);
754-
when(childPolicy.distance(host3)).thenReturn(HostDistance.LOCAL);
755-
when(childPolicy.newQueryPlan(KEYSPACE, localSerialStatement))
756-
.thenReturn(Lists.newArrayList(host4, host5).iterator());
757-
758-
TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, TOPOLOGICAL);
759-
policy.init(cluster, null);
760-
761-
// when
762-
Iterator<Host> queryPlan = policy.newQueryPlan(KEYSPACE, localSerialStatement);
763-
764-
// then: only local replicas (host1, host3), no remote replica (host2), then non-replicas
765-
assertThat(queryPlan).containsExactly(host1, host3, host4, host5);
766-
}
767-
768-
@Test(groups = "unit")
769-
public void should_include_remote_replicas_for_serial() {
770-
// given: a SERIAL statement with some replicas in remote DC
748+
@Test(groups = "unit", dataProvider = "serialConsistencyProvider")
749+
public void should_include_remote_replicas_for_serial_consistency(
750+
com.datastax.driver.core.ConsistencyLevel serialCl) {
751+
// given: a serial-CL statement with some replicas in remote DC
771752
Statement serialStatement = mock(Statement.class);
772753
when(serialStatement.isLWT()).thenReturn(false);
773-
when(serialStatement.getConsistencyLevel())
774-
.thenReturn(com.datastax.driver.core.ConsistencyLevel.SERIAL);
754+
when(serialStatement.getConsistencyLevel()).thenReturn(serialCl);
775755
when(serialStatement.getRoutingKey(any(ProtocolVersion.class), any(CodecRegistry.class)))
776756
.thenReturn(routingKey);
777757
when(serialStatement.getKeyspace()).thenReturn(KEYSPACE);
@@ -789,7 +769,7 @@ public void should_include_remote_replicas_for_serial() {
789769
// when
790770
Iterator<Host> queryPlan = policy.newQueryPlan(KEYSPACE, serialStatement);
791771

792-
// then: local replicas (host1, host3), then remote replica (host2), then non-replicas
772+
// then: local replicas first (host1, host3), then remote replica (host2), then non-replicas
793773
assertThat(queryPlan).containsExactly(host1, host3, host2, host4, host5);
794774
}
795775

0 commit comments

Comments
 (0)