-
Notifications
You must be signed in to change notification settings - Fork 30
CASSANALYTICS-160: Analytics should identify if a keyspace is tracked to determine appropriate stream session to use for bulk writes #214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: mutation-tracking-support
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,6 +52,9 @@ public final class CqlUtils | |
| "max_index_interval" | ||
| ); | ||
| private static final Pattern REPLICATION_FACTOR_PATTERN = Pattern.compile("WITH REPLICATION = (\\{[^\\}]*\\})"); | ||
| private static final String TRACKED_REPLICATION_TYPE = "tracked"; | ||
| private static final Pattern REPLICATION_TYPE_PATTERN = Pattern.compile("replication_type\\s*=\\s*'(\\w+)'", | ||
| Pattern.CASE_INSENSITIVE); | ||
| // Initialize a mapper allowing single quotes to process the RF string from the CREATE KEYSPACE statement | ||
| private static final ObjectMapper MAPPER = new ObjectMapper().configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); | ||
| private static final Pattern ESCAPED_WHITESPACE_PATTERN = Pattern.compile("(\\\\r|\\\\n|\\\\r\\n)+"); | ||
|
|
@@ -296,4 +299,33 @@ public static boolean isTimeRangeFilterSupported(String compactionStrategy) | |
| { | ||
| return compactionStrategy == null || compactionStrategy.endsWith("TimeWindowCompactionStrategy"); | ||
| } | ||
|
|
||
| /** | ||
| * Extracts replication type from create schema statement | ||
| * | ||
| * @param schemaStr full cluster schema string as returned by Sidecar | ||
| * @param keyspace name of the keyspace to check | ||
| * @return {@code true} if keyspace is tracked {@code false} otherwise | ||
|
sarankk marked this conversation as resolved.
|
||
| */ | ||
| public static String extractReplicationType(@NotNull String schemaStr, @NotNull String keyspace) | ||
| { | ||
| String createKeyspaceSchema = extractKeyspaceSchema(schemaStr, keyspace); | ||
| Matcher matcher = REPLICATION_TYPE_PATTERN.matcher(createKeyspaceSchema); | ||
| if (matcher.find()) | ||
| { | ||
| return matcher.group(1); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * Returns {@code true} if {@code replication_type = 'tracked'} in create statement otherwise {@code false} | ||
| * | ||
| * @param replicationType replication type extracted from create statement | ||
| * @return {@code true} if replication type is tracked {@code false} otherwise | ||
| */ | ||
| public static boolean isTracked(String replicationType) | ||
| { | ||
| return TRACKED_REPLICATION_TYPE.equalsIgnoreCase(replicationType); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we are checking for replication_type value only. But the cluster need to have mutation_tracking.enabled to true for the feature to work at Cassandra side. Can you please explore what happens in Cassandra if a keyspace was created with replication_type = tracked, but then later admins changed mutation_tracking.enabled to false? Based on these findings we may need to check for mutation_tracking.enabled as well.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch Shailaja! thanks, looks like we need to check the cluster level flag as well |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -356,6 +356,17 @@ public ReplicationFactor replicationFactor() | |
| } | ||
| } | ||
|
|
||
| @Override | ||
| public String getReplicationType() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CassandraClusterInfoGroup not overriding this, i.e, doesn't detect tracked keyspaces
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| { | ||
| String keyspaceSchema = getKeyspaceSchema(true); | ||
| if (keyspaceSchema == null) | ||
| { | ||
| throw new RuntimeException("Could not retrieve keyspace schema information for keyspace " + conf.keyspace); | ||
| } | ||
| return CqlUtils.extractReplicationType(keyspaceSchema, conf.keyspace); | ||
| } | ||
|
|
||
| @Override | ||
| public TokenRangeMapping<RingInstance> getTokenRangeMapping(boolean cached) | ||
| { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.cassandra.spark.bulkwriter; | ||
|
|
||
| import java.math.BigInteger; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ExecutorService; | ||
|
|
||
| import com.google.common.collect.Range; | ||
|
|
||
| import org.apache.cassandra.bridge.SSTableDescriptor; | ||
| import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; | ||
|
|
||
| /** | ||
| * Stream session for bulk writes to keyspaces with mutation tracking enabled. | ||
| * | ||
| * <p> | ||
| * Tracked stream session uploads and triggers import on the <em>coordinator node only</em>. Cassandra's coordinated | ||
| * transfer then propagates the data to all other replicas, avoiding the duplicate-row updates that would occur if each | ||
| * replica independently streamed the data to its peers. | ||
| * <p> | ||
| */ | ||
| public class TrackedDirectStreamSession extends StreamSession<TransportContext.DirectDataBulkWriterContext> | ||
|
sarankk marked this conversation as resolved.
|
||
| { | ||
| public TrackedDirectStreamSession(BulkWriterContext writerContext, | ||
| SortedSSTableWriter sstableWriter, | ||
| TransportContext.DirectDataBulkWriterContext transportContext, | ||
| String sessionID, | ||
| Range<BigInteger> tokenRange, | ||
| ReplicaAwareFailureHandler<RingInstance> failureHandler, | ||
| ExecutorService executorService) | ||
| { | ||
| super(writerContext, sstableWriter, transportContext, sessionID, tokenRange, failureHandler, executorService); | ||
| } | ||
|
|
||
| @Override | ||
| protected void onSSTablesProduced(Set<SSTableDescriptor> sstables) | ||
| { | ||
| throw new UnsupportedOperationException("TrackedDirectStreamSession is not yet implemented"); | ||
| } | ||
|
|
||
| @Override | ||
| protected StreamResult doFinalizeStream() | ||
| { | ||
| throw new UnsupportedOperationException("TrackedDirectStreamSession is not yet implemented"); | ||
| } | ||
|
|
||
| @Override | ||
| protected void sendRemainingSSTables() | ||
| { | ||
| throw new UnsupportedOperationException("TrackedDirectStreamSession is not yet implemented"); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,8 +30,10 @@ | |
| import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; | ||
| import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; | ||
| import org.apache.cassandra.spark.exception.TimeSkewTooLargeException; | ||
| import org.apache.cassandra.spark.utils.CqlUtils; | ||
|
|
||
| import static org.apache.cassandra.spark.TestUtils.range; | ||
| import static org.assertj.core.api.Assertions.assertThat; | ||
| import static org.assertj.core.api.Assertions.assertThatNoException; | ||
| import static org.assertj.core.api.Assertions.assertThatThrownBy; | ||
| import static org.mockito.ArgumentMatchers.any; | ||
|
|
@@ -75,6 +77,50 @@ public static CassandraClusterInfo mockClusterInfoForTimeSkewTest(int allowanceM | |
| return new MockClusterInfoForTimeSkew(allowanceMinutes, remoteNow); | ||
| } | ||
|
|
||
| @Test | ||
| void testGetTrackedReplicationType() | ||
| { | ||
| String schema = "CREATE KEYSPACE mykeyspace " | ||
| + "WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'}" | ||
| + " AND durable_writes = true" | ||
| + " AND replication_type = 'tracked';"; | ||
| CassandraClusterInfo ci = mockClusterInfoWithSchema("mykeyspace", schema); | ||
| assertThat(ci.getReplicationType()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These tests are not testing changes to CassandraClusterInfo, because MockClusterInfoForSchema overriding getReplicationType to call CqlUtils.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes otherwise we need to mock sidecar client calls to get schema, avoiding that
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have in-jvm tests to test CassandraClusterInfo code path then? Tests in CassandraClusterInfoTest.java should test CassandraClusterInfo. If it is difficult, we can have in-jvm tests which go through this class.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @skoppu22 I plan to add in-jvm dtest in the follow up PR which adds the new stream session for write |
||
| .describedAs("Keyspace with replication_type = 'tracked' should return 'tracked'") | ||
| .isEqualTo("tracked"); | ||
| } | ||
|
|
||
| @Test | ||
| void testGetUntrackedReplicationType() | ||
| { | ||
| String schema = "CREATE KEYSPACE k " | ||
| + "WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'}" | ||
| + " AND durable_writes = true" | ||
| + " AND replication_type = 'untracked';"; | ||
| CassandraClusterInfo ci = mockClusterInfoWithSchema("k", schema); | ||
| assertThat(ci.getReplicationType()) | ||
| .describedAs("Keyspace with replication_type = 'untracked' should return 'untracked'") | ||
| .isEqualTo("untracked"); | ||
| } | ||
|
|
||
| @Test | ||
| void testGetReplicationTypeReturnsNullWhenPropertyAbsent() | ||
| { | ||
| // Schema without replication_type — pre-mutation-tracking clusters | ||
| String schema = "CREATE KEYSPACE k " | ||
| + "WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'}" | ||
| + " AND durable_writes = true;"; | ||
| CassandraClusterInfo ci = mockClusterInfoWithSchema("k", schema); | ||
| assertThat(ci.getReplicationType()) | ||
| .describedAs("Keyspace without replication_type property should return null") | ||
| .isNull(); | ||
| } | ||
|
|
||
| private static CassandraClusterInfo mockClusterInfoWithSchema(String keyspace, String schemaStr) | ||
| { | ||
| return new MockClusterInfoForSchema(keyspace, schemaStr); | ||
| } | ||
|
|
||
| private static class MockClusterInfoForTimeSkew extends CassandraClusterInfo | ||
| { | ||
| private CassandraContext cassandraContext; | ||
|
|
@@ -107,4 +153,39 @@ private void mockCassandraContext(int allowanceMinutes, Instant remoteNow) | |
| when(cassandraContext.sidecarPort()).thenReturn(9043); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Minimal ClusterInfo stub that returns a fixed keyspace schema string, used to test | ||
| * {@link CassandraClusterInfo#getReplicationType()}. | ||
| */ | ||
| private static class MockClusterInfoForSchema extends CassandraClusterInfo | ||
| { | ||
| private final String keyspaceName; | ||
| private final String schemaStr; | ||
|
|
||
| MockClusterInfoForSchema(String keyspace, String schemaStr) | ||
| { | ||
| super((BulkSparkConf) null); | ||
| this.keyspaceName = keyspace; | ||
| this.schemaStr = schemaStr; | ||
| } | ||
|
|
||
| @Override | ||
| protected CassandraContext buildCassandraContext() | ||
| { | ||
| return mock(CassandraContext.class, RETURNS_DEEP_STUBS); | ||
| } | ||
|
|
||
| @Override | ||
| public String getKeyspaceSchema(boolean cached) | ||
| { | ||
| return schemaStr; | ||
| } | ||
|
|
||
| @Override | ||
| public String getReplicationType() | ||
| { | ||
| return CqlUtils.extractReplicationType(schemaStr, keyspaceName); | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.