Skip to content

Commit a4aa927

Browse files
authored
CASSANALYTICS-171: Avoid Spark 4 partitioning warnings during reads (#213)
Spark 4 ignores custom DataSource V2 Partitioning implementations and logs a warning. Cassandra scan partitions are token ranges rather than keyed groups, so report Spark's UnknownPartitioning directly while preserving the input partition count. Patch by Liu Cao; reviewed by Francisco Guerrero, Saranya Krishnakumar for CASSANALYTICS-171
1 parent 87c553c commit a4aa927

3 files changed

Lines changed: 23 additions & 11 deletions

File tree

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
0.5.0
22
-----
3+
* Avoid Spark 4 partitioning warnings during bulk reads (CASSANALYTICS-171)
34
* Spark 4.0 Support (CASSANALYTICS-34)
45
* Add IAM credential support for S3 storage transport (CASSANALYTICS-155)
56
* Make BulkWriterConfig extensible (CASSANALYTICS-168)

cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
4444
import org.apache.spark.sql.connector.read.SupportsReportPartitioning;
4545
import org.apache.spark.sql.connector.read.partitioning.Partitioning;
46+
import org.apache.spark.sql.connector.read.partitioning.UnknownPartitioning;
4647
import org.apache.spark.sql.sources.Filter;
4748
import org.apache.spark.sql.types.StructType;
4849
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -121,7 +122,7 @@ public PartitionReaderFactory createReaderFactory()
121122
@Override
122123
public Partitioning outputPartitioning()
123124
{
124-
return new CassandraPartitioning(dataLayer);
125+
return new UnknownPartitioning(dataLayer.partitionCount());
125126
}
126127

127128
private List<PartitionKeyFilter> buildPartitionKeyFilters()

cassandra-analytics-core/src/main/spark4/org/apache/cassandra/spark/sparksql/CassandraPartitioning.java renamed to cassandra-analytics-core/src/test/spark4/org/apache/cassandra/spark/sparksql/CassandraScanBuilderTest.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,31 @@
1919

2020
package org.apache.cassandra.spark.sparksql;
2121

22+
import org.junit.jupiter.api.Test;
23+
2224
import org.apache.cassandra.spark.data.DataLayer;
2325
import org.apache.spark.sql.connector.read.partitioning.Partitioning;
26+
import org.apache.spark.sql.connector.read.partitioning.UnknownPartitioning;
27+
import org.apache.spark.sql.types.StructType;
28+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
2429

25-
class CassandraPartitioning implements Partitioning
26-
{
27-
final DataLayer dataLayer;
30+
import static org.assertj.core.api.Assertions.assertThat;
31+
import static org.mockito.Mockito.mock;
32+
import static org.mockito.Mockito.when;
2833

29-
CassandraPartitioning(DataLayer dataLayer)
34+
class CassandraScanBuilderTest
35+
{
36+
@Test
37+
void outputPartitioningReportsUnknownPartitioningWithPartitionCount()
3038
{
31-
this.dataLayer = dataLayer;
32-
}
39+
DataLayer dataLayer = mock(DataLayer.class);
40+
when(dataLayer.partitionCount()).thenReturn(7);
41+
CassandraScanBuilder builder =
42+
new CassandraScanBuilder(dataLayer, new StructType(), CaseInsensitiveStringMap.empty());
3343

34-
@Override
35-
public int numPartitions()
36-
{
37-
return dataLayer.partitionCount();
44+
Partitioning partitioning = builder.outputPartitioning();
45+
46+
assertThat(partitioning).isInstanceOf(UnknownPartitioning.class);
47+
assertThat(partitioning.numPartitions()).isEqualTo(7);
3848
}
3949
}

0 commit comments

Comments
 (0)