Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.5.0
-----
* Analytics should identify if a keyspace is tracked to determine appropriate stream session to use for bulk writes (CASSANALYTICS-160)
Comment thread
sarankk marked this conversation as resolved.
* Avoid Spark 4 partitioning warnings during bulk reads (CASSANALYTICS-171)
* Spark 4.0 Support (CASSANALYTICS-34)
* Add IAM credential support for S3 storage transport (CASSANALYTICS-155)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public final class CqlUtils
"max_index_interval"
);
private static final Pattern REPLICATION_FACTOR_PATTERN = Pattern.compile("WITH REPLICATION = (\\{[^\\}]*\\})");
private static final String TRACKED_REPLICATION_TYPE = "tracked";
private static final Pattern REPLICATION_TYPE_PATTERN = Pattern.compile("replication_type\\s*=\\s*'(\\w+)'",
Pattern.CASE_INSENSITIVE);
// Initialize a mapper allowing single quotes to process the RF string from the CREATE KEYSPACE statement
private static final ObjectMapper MAPPER = new ObjectMapper().configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
private static final Pattern ESCAPED_WHITESPACE_PATTERN = Pattern.compile("(\\\\r|\\\\n|\\\\r\\n)+");
Expand Down Expand Up @@ -296,4 +299,33 @@ public static boolean isTimeRangeFilterSupported(String compactionStrategy)
{
return compactionStrategy == null || compactionStrategy.endsWith("TimeWindowCompactionStrategy");
}

/**
* Extracts replication type from create schema statement
*
* @param schemaStr full cluster schema string as returned by Sidecar
* @param keyspace name of the keyspace to check
* @return {@code true} if keyspace is tracked {@code false} otherwise
Comment thread
sarankk marked this conversation as resolved.
*/
public static String extractReplicationType(@NotNull String schemaStr, @NotNull String keyspace)
{
String createKeyspaceSchema = extractKeyspaceSchema(schemaStr, keyspace);
Matcher matcher = REPLICATION_TYPE_PATTERN.matcher(createKeyspaceSchema);
if (matcher.find())
{
return matcher.group(1);
}
return null;
}

/**
* Returns {@code true} if {@code replication_type = 'tracked'} in create statement otherwise {@code false}
*
* @param replicationType replication type extracted from create statement
* @return {@code true} if replication type is tracked {@code false} otherwise
*/
public static boolean isTracked(String replicationType)
{
return TRACKED_REPLICATION_TYPE.equalsIgnoreCase(replicationType);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are checking for replication_type value only. But the cluster need to have mutation_tracking.enabled to true for the feature to work at Cassandra side. Can you please explore what happens in Cassandra if a keyspace was created with replication_type = tracked, but then later admins changed mutation_tracking.enabled to false? Based on these findings we may need to check for mutation_tracking.enabled as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch Shailaja! thanks, looks like we need to check the cluster level flag as well replication_type = tracked does not guarantee mutation tracking is enabled at cluster level

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,17 @@ public ReplicationFactor replicationFactor()
}
}

@Override
public String getReplicationType()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CassandraClusterInfoGroup not overriding this, i.e, doesn't detect tracked keyspaces

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CassandraClusterInfoGroup is used for bulk writes through S3, for now we will not support write to tracked tables via S3. We will handle S3 code path changes in a separate JIRA

{
String keyspaceSchema = getKeyspaceSchema(true);
if (keyspaceSchema == null)
{
throw new RuntimeException("Could not retrieve keyspace schema information for keyspace " + conf.keyspace);
}
return CqlUtils.extractReplicationType(keyspaceSchema, conf.keyspace);
}

@Override
public TokenRangeMapping<RingInstance> getTokenRangeMapping(boolean cached)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,24 @@ public CassandraDirectDataTransportContext(@NotNull BulkWriterContext bulkWriter
}

@Override
public DirectStreamSession createStreamSession(BulkWriterContext writerContext,
String sessionId,
Comment thread
sarankk marked this conversation as resolved.
SortedSSTableWriter sstableWriter,
Range<BigInteger> range,
ReplicaAwareFailureHandler<RingInstance> failureHandler,
ExecutorService executorService)
public StreamSession<TransportContext.DirectDataBulkWriterContext> createStreamSession(
BulkWriterContext writerContext,
String sessionId,
SortedSSTableWriter sstableWriter,
Range<BigInteger> range,
ReplicaAwareFailureHandler<RingInstance> failureHandler,
ExecutorService executorService)
{
if (bridge().isTracked(clusterInfo.getReplicationType()))
{
return new TrackedDirectStreamSession(writerContext,
Comment thread
sarankk marked this conversation as resolved.
sstableWriter,
this,
sessionId,
range,
failureHandler,
executorService);
}
return new DirectStreamSession(writerContext,
sstableWriter,
this,
Expand All @@ -71,7 +82,11 @@ public DirectDataTransferApi dataTransferApi()
// only invoke in constructor
protected DirectDataTransferApi createDirectDataTransferApi()
{
CassandraBridge bridge = CassandraBridgeFactory.get(clusterInfo.getLowestCassandraVersion());
return new SidecarDataTransferApi(clusterInfo.getCassandraContext(), bridge, jobInfo);
return new SidecarDataTransferApi(clusterInfo.getCassandraContext(), bridge(), jobInfo);
}

private CassandraBridge bridge()
Comment thread
sarankk marked this conversation as resolved.
{
return CassandraBridgeFactory.get(clusterInfo.getLowestCassandraVersion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ public interface ClusterInfo extends StartupValidatable
*/
ReplicationFactor replicationFactor();

/**
* @return {@code replication_type} of the enclosing keyspace (e.g. {@code "tracked"}, {@code "untracked"}),
* or {@code null} if replication_type is absent
*/
@Nullable
default String getReplicationType()
{
return null;
}

CassandraContext getCassandraContext();

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cassandra.spark.bulkwriter;

import java.math.BigInteger;
import java.util.Set;
import java.util.concurrent.ExecutorService;

import com.google.common.collect.Range;

import org.apache.cassandra.bridge.SSTableDescriptor;
import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;

/**
* Stream session for bulk writes to keyspaces with mutation tracking enabled.
*
* <p>
* Tracked stream session uploads and triggers import on the <em>coordinator node only</em>. Cassandra's coordinated
* transfer then propagates the data to all other replicas, avoiding the duplicate-row updates that would occur if each
* replica independently streamed the data to its peers.
* <p>
*/
public class TrackedDirectStreamSession extends StreamSession<TransportContext.DirectDataBulkWriterContext>
Comment thread
sarankk marked this conversation as resolved.
{
public TrackedDirectStreamSession(BulkWriterContext writerContext,
SortedSSTableWriter sstableWriter,
TransportContext.DirectDataBulkWriterContext transportContext,
String sessionID,
Range<BigInteger> tokenRange,
ReplicaAwareFailureHandler<RingInstance> failureHandler,
ExecutorService executorService)
{
super(writerContext, sstableWriter, transportContext, sessionID, tokenRange, failureHandler, executorService);
}

@Override
protected void onSSTablesProduced(Set<SSTableDescriptor> sstables)
{
throw new UnsupportedOperationException("TrackedDirectStreamSession is not yet implemented");
}

@Override
protected StreamResult doFinalizeStream()
{
throw new UnsupportedOperationException("TrackedDirectStreamSession is not yet implemented");
}

@Override
protected void sendRemainingSSTables()
{
throw new UnsupportedOperationException("TrackedDirectStreamSession is not yet implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
import org.apache.cassandra.spark.exception.TimeSkewTooLargeException;
import org.apache.cassandra.spark.utils.CqlUtils;

import static org.apache.cassandra.spark.TestUtils.range;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -75,6 +77,50 @@ public static CassandraClusterInfo mockClusterInfoForTimeSkewTest(int allowanceM
return new MockClusterInfoForTimeSkew(allowanceMinutes, remoteNow);
}

@Test
void testGetTrackedReplicationType()
{
String schema = "CREATE KEYSPACE mykeyspace "
+ "WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'}"
+ " AND durable_writes = true"
+ " AND replication_type = 'tracked';";
CassandraClusterInfo ci = mockClusterInfoWithSchema("mykeyspace", schema);
assertThat(ci.getReplicationType())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are not testing changes to CassandraClusterInfo, because MockClusterInfoForSchema overriding getReplicationType to call CqlUtils.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes otherwise we need to mock sidecar client calls to get schema, avoiding that

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have in-jvm tests to test CassandraClusterInfo code path then? Tests in CassandraClusterInfoTest.java‎ should test CassandraClusterInfo. If it is difficult, we can have in-jvm tests which go through this class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@skoppu22 I plan to add in-jvm dtest in the follow up PR which adds the new stream session for write

.describedAs("Keyspace with replication_type = 'tracked' should return 'tracked'")
.isEqualTo("tracked");
}

@Test
void testGetUntrackedReplicationType()
{
String schema = "CREATE KEYSPACE k "
+ "WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'}"
+ " AND durable_writes = true"
+ " AND replication_type = 'untracked';";
CassandraClusterInfo ci = mockClusterInfoWithSchema("k", schema);
assertThat(ci.getReplicationType())
.describedAs("Keyspace with replication_type = 'untracked' should return 'untracked'")
.isEqualTo("untracked");
}

@Test
void testGetReplicationTypeReturnsNullWhenPropertyAbsent()
{
// Schema without replication_type — pre-mutation-tracking clusters
String schema = "CREATE KEYSPACE k "
+ "WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'}"
+ " AND durable_writes = true;";
CassandraClusterInfo ci = mockClusterInfoWithSchema("k", schema);
assertThat(ci.getReplicationType())
.describedAs("Keyspace without replication_type property should return null")
.isNull();
}

private static CassandraClusterInfo mockClusterInfoWithSchema(String keyspace, String schemaStr)
{
return new MockClusterInfoForSchema(keyspace, schemaStr);
}

private static class MockClusterInfoForTimeSkew extends CassandraClusterInfo
{
private CassandraContext cassandraContext;
Expand Down Expand Up @@ -107,4 +153,39 @@ private void mockCassandraContext(int allowanceMinutes, Instant remoteNow)
when(cassandraContext.sidecarPort()).thenReturn(9043);
}
}

/**
* Minimal ClusterInfo stub that returns a fixed keyspace schema string, used to test
* {@link CassandraClusterInfo#getReplicationType()}.
*/
private static class MockClusterInfoForSchema extends CassandraClusterInfo
{
private final String keyspaceName;
private final String schemaStr;

MockClusterInfoForSchema(String keyspace, String schemaStr)
{
super((BulkSparkConf) null);
this.keyspaceName = keyspace;
this.schemaStr = schemaStr;
}

@Override
protected CassandraContext buildCassandraContext()
{
return mock(CassandraContext.class, RETURNS_DEEP_STUBS);
}

@Override
public String getKeyspaceSchema(boolean cached)
{
return schemaStr;
}

@Override
public String getReplicationType()
{
return CqlUtils.extractReplicationType(schemaStr, keyspaceName);
}
}
}
Loading
Loading