From 6ee5a99199599df8e414107cd90fb163491ba193 Mon Sep 17 00:00:00 2001 From: Shailaja Koppu Date: Thu, 4 Jun 2026 17:17:45 +0100 Subject: [PATCH] integration tests --- .../analytics/BasicRowsReadTest.java | 92 +++++++++ .../analytics/ClusteringOrderByReadTest.java | 94 +++++++++ .../analytics/MaxIndexIntervalReadTest.java | 88 ++++++++ .../analytics/NestedCollectionsReadTest.java | 142 +++++++++++++ .../analytics/PartitionKeyFilterReadTest.java | 117 +++++++++++ ...SharedClusterSparkIntegrationTestBase.java | 49 +++++ .../StaticColumnSharedValueReadTest.java | 128 ++++++++++++ .../analytics/SumAggregationReadTest.java | 85 ++++++++ .../analytics/TableUnionReadTest.java | 100 +++++++++ .../analytics/TombstonesReadTest.java | 193 ++++++++++++++++++ 10 files changed, 1088 insertions(+) create mode 100644 cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BasicRowsReadTest.java create mode 100644 cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ClusteringOrderByReadTest.java create mode 100644 cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/MaxIndexIntervalReadTest.java create mode 100644 cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/NestedCollectionsReadTest.java create mode 100644 cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/PartitionKeyFilterReadTest.java create mode 100644 cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/StaticColumnSharedValueReadTest.java create mode 100644 cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SumAggregationReadTest.java create mode 100644 cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TableUnionReadTest.java create mode 100644 cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TombstonesReadTest.java diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BasicRowsReadTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BasicRowsReadTest.java new file mode 100644 index 000000000..a562528f1 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BasicRowsReadTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Inserts rows across multiple SSTables (flushing between batches) and asserts the bulk reader + * returns every (a,b) -> c triple, exercising the multi-SSTable merge path. + */ +class BasicRowsReadTest extends SharedClusterSparkIntegrationTestBase +{ + static final int NUM_SSTABLES = 5; + static final int NUM_ROWS = 5; + static final int NUM_COLS = 4; + + QualifiedName table = uniqueTestTableFullName(TEST_KEYSPACE, "basic_rows"); + Map expected = new HashMap<>(); + + @Test + void testAllRowsReturned() + { + Dataset data = bulkReaderDataFrame(table).load(); + assertThat(data.count()).isEqualTo((long) NUM_SSTABLES * NUM_ROWS * NUM_COLS); + + List rows = data.collectAsList(); + assertThat(rows).hasSize(NUM_SSTABLES * NUM_ROWS * NUM_COLS); + for (Row row : rows) + { + String key = row.getLong(0) + ":" + row.getLong(1); + assertThat(expected).containsKey(key); + assertThat(row.getLong(2)).isEqualTo(expected.get(key)); + } + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + createTestTable(table, "CREATE TABLE IF NOT EXISTS %s (a bigint, b bigint, c bigint, PRIMARY KEY (a, b));"); + disableAutoCompaction(table); + + Random random = new Random(0); + long partitionKey = 0; + for (int s = 0; s < NUM_SSTABLES; s++) + { + for (int r = 0; r < NUM_ROWS; r++) + { + for (long clusteringKey = 0; clusteringKey < NUM_COLS; clusteringKey++) + { + long value = random.nextInt(101); + expected.put(partitionKey + ":" + clusteringKey, value); + execute(String.format("INSERT INTO %s (a, b, c) VALUES (%d, %d, %d);", + table, partitionKey, clusteringKey, value)); + } + partitionKey++; + } + flushKeyspace(table); + } + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ClusteringOrderByReadTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ClusteringOrderByReadTest.java new file mode 100644 index 000000000..9dab64c41 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ClusteringOrderByReadTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Verifies the bulk reader correctly handles {@code WITH CLUSTERING ORDER BY (b DESC)} and does + * not return duplicate rows when the same clustering key is written in a later SSTable — the + * later write must overwrite on the merge path. + */ +class ClusteringOrderByReadTest extends SharedClusterSparkIntegrationTestBase +{ + static final int NUM_ROWS = 5; + static final int NUM_COLS = 4; + + QualifiedName table = uniqueTestTableFullName(TEST_KEYSPACE, "clust_order"); + Map expected = new HashMap<>(); + + @Test + void testClusteringOrderByNoDuplicates() + { + Dataset data = bulkReaderDataFrame(table).load(); + assertThat(data.count()).isEqualTo(expected.size()); + + for (Row row : data.collectAsList()) + { + String key = row.getLong(0) + ":" + row.getLong(1); + assertThat(expected).as("unexpected key %s", key).containsKey(key); + assertThat(row.getLong(2)).isEqualTo(expected.get(key)); + } + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + createTestTable(table, "CREATE TABLE IF NOT EXISTS %s (a bigint, b bigint, c bigint, " + + "PRIMARY KEY (a, b)) WITH CLUSTERING ORDER BY (b DESC);"); + disableAutoCompaction(table); + + Random random = new Random(0); + long partitionKey = 0; + for (int r = 0; r < NUM_ROWS; r++) + { + for (long clusteringKey = 0; clusteringKey < NUM_COLS; clusteringKey++) + { + long value = random.nextInt(101); + expected.put(partitionKey + ":" + clusteringKey, value); + execute(String.format("INSERT INTO %s (a, b, c) VALUES (%d, %d, %d);", + table, partitionKey, clusteringKey, value)); + } + partitionKey++; + } + flushKeyspace(table); + + // rewrite smallest clustering key (0, 0) in a separate SSTable — would produce duplicates + // if WITH CLUSTERING ORDER BY were not honored on the merge path + long rewriteValue = random.nextInt(101); + expected.put("0:0", rewriteValue); + execute(String.format("INSERT INTO %s (a, b, c) VALUES (0, 0, %d);", table, rewriteValue)); + flushKeyspace(table); + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/MaxIndexIntervalReadTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/MaxIndexIntervalReadTest.java new file mode 100644 index 000000000..6366a5360 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/MaxIndexIntervalReadTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.Random; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Creates a table with custom {@code max_index_interval=4096 AND min_index_interval=32}, loads a + * larger dataset across multiple SSTables, and performs a bulk read. + * + *

Coverage scope. This test covers : + *

    + *
  • that the bulk reader does not error or bail out on a table with non-default index + * intervals, and
  • + *
  • that the read returns the expected row count.
  • + *
+ */ +class MaxIndexIntervalReadTest extends SharedClusterSparkIntegrationTestBase +{ + static final int NUM_SSTABLES = 10; + static final int NUM_ROWS = 100; + static final int NUM_COLS = 8; + + QualifiedName table = uniqueTestTableFullName(TEST_KEYSPACE, "max_idx_interval"); + long expectedRowCount; + + @Test + void testBulkReadSucceedsWithCustomIndexIntervals() + { + Dataset data = bulkReaderDataFrame(table).load(); + assertThat(data.count()).isEqualTo(expectedRowCount); + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + createTestTable(table, "CREATE TABLE IF NOT EXISTS %s (a bigint, b bigint, c bigint, PRIMARY KEY (a, b)) " + + "WITH max_index_interval=4096 AND min_index_interval=32;"); + disableAutoCompaction(table); + + Random random = new Random(0); + long partitionKey = 0; + for (int s = 0; s < NUM_SSTABLES; s++) + { + for (int r = 0; r < NUM_ROWS; r++) + { + for (long clusteringKey = 0; clusteringKey < NUM_COLS; clusteringKey++) + { + long value = random.nextInt(101); + execute(String.format("INSERT INTO %s (a, b, c) VALUES (%d, %d, %d);", + table, partitionKey, clusteringKey, value)); + } + partitionKey++; + } + flushKeyspace(table); + } + expectedRowCount = (long) NUM_SSTABLES * NUM_ROWS * NUM_COLS; + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/NestedCollectionsReadTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/NestedCollectionsReadTest.java new file mode 100644 index 000000000..3583468ad --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/NestedCollectionsReadTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import scala.collection.JavaConverters; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Exercises the bulk reader on a nested-collection column — + * {@code map>>}. + */ +class NestedCollectionsReadTest extends SharedClusterSparkIntegrationTestBase +{ + static final int NUM_SSTABLES = 5; + static final int NUM_ROWS = 50; + static final int MAP_ENTRIES = 5; + static final int LIST_ENTRIES = 10; + + QualifiedName table = uniqueTestTableFullName(TEST_KEYSPACE, "nested"); + Map>> expected = new HashMap<>(); + + @Test + void testNestedRead() + { + Dataset data = bulkReaderDataFrame(table).load(); + assertThat(data.count()).isEqualTo(expected.size()); + + for (Row row : data.collectAsList()) + { + long key = row.getLong(0); + Map> expectedRow = expected.get(key); + assertThat(expectedRow).as("Unexpected key in Spark output: %s", key).isNotNull(); + + Map actualOuter = JavaConverters.mapAsJavaMapConverter(row.getMap(1)).asJava(); + assertThat(actualOuter).hasSize(expectedRow.size()); + + for (Map.Entry outerEntry : actualOuter.entrySet()) + { + int outerKey = ((Number) outerEntry.getKey()).intValue(); + assertThat(expectedRow).containsKey(outerKey); + + @SuppressWarnings("unchecked") + scala.collection.Seq innerSeq = (scala.collection.Seq) outerEntry.getValue(); + List actualInner = JavaConverters.seqAsJavaListConverter(innerSeq).asJava().stream() + .map(v -> ((Number) v).longValue()) + .collect(Collectors.toList()); + assertThat(actualInner).isEqualTo(expectedRow.get(outerKey)); + } + } + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + createTestTable(table, "CREATE TABLE IF NOT EXISTS %s (a bigint, b map>>, " + + "PRIMARY KEY (a));"); + disableAutoCompaction(table); + + Random random = new Random(0); + for (int s = 0; s < NUM_SSTABLES; s++) + { + for (long partitionKey = 0; partitionKey < NUM_ROWS; partitionKey++) + { + Map> nested = new LinkedHashMap<>(); + StringBuilder mapCql = new StringBuilder("{"); + Set usedOuterKeys = new HashSet<>(); + int mapIdx = 0; + for (int o = 0; o < MAP_ENTRIES; o++) + { + int outerKey; + do + { + outerKey = random.nextInt(100_000); + } while (!usedOuterKeys.add(outerKey)); + + List inner = new ArrayList<>(LIST_ENTRIES); + StringBuilder listCql = new StringBuilder("["); + for (int i = 0; i < LIST_ENTRIES; i++) + { + long value = Math.abs(random.nextLong()) % 100_000_000L; + inner.add(value); + if (i > 0) + { + listCql.append(","); + } + listCql.append(value); + } + listCql.append("]"); + + nested.put(outerKey, inner); + if (mapIdx++ > 0) + { + mapCql.append(","); + } + mapCql.append(outerKey).append(":").append(listCql); + } + mapCql.append("}"); + + execute(String.format("INSERT INTO %s (a, b) VALUES (%d, %s);", table, partitionKey, mapCql)); + expected.put(partitionKey, nested); + } + flushKeyspace(table); + } + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/PartitionKeyFilterReadTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/PartitionKeyFilterReadTest.java new file mode 100644 index 000000000..0e9c4e24c --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/PartitionKeyFilterReadTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.functions; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Verifies partition-key pushdown on the bulk reader, both single-value ({@code a = N}) and + * multi-value ({@code a IN (...)}) forms. + * + *

The OSS bulk reader's {@code CassandraDataSource} accepts partition-key predicates as + * standard Spark filters; this test uses {@code Dataset#filter} to express them. + */ +class PartitionKeyFilterReadTest extends SharedClusterSparkIntegrationTestBase +{ + static final int NUM_SSTABLES = 5; + static final int NUM_ROWS = 5; + static final int NUM_COLS = 4; + + QualifiedName table = uniqueTestTableFullName(TEST_KEYSPACE, "key_filter"); + Map allRows = new HashMap<>(); + + @Test + void testSinglePartitionKeyFilter() + { + Dataset data = bulkReaderDataFrame(table).load().filter(functions.col("a").equalTo(1L)); + Map expected = filterByKeys(Set.of(1L)); + assertRowsMatch(data, expected); + } + + @Test + void testMultiplePartitionKeyFilter() + { + Dataset data = bulkReaderDataFrame(table).load().filter(functions.col("a").isin(1L, 2L, 3L)); + Map expected = filterByKeys(Set.of(1L, 2L, 3L)); + assertRowsMatch(data, expected); + } + + private Map filterByKeys(Set keys) + { + return allRows.entrySet().stream() + .filter(e -> keys.contains(Long.parseLong(e.getKey().split(":")[0]))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private void assertRowsMatch(Dataset data, Map expected) + { + List rows = data.collectAsList(); + assertThat(rows).hasSize(expected.size()); + for (Row row : rows) + { + String key = row.getLong(0) + ":" + row.getLong(1); + assertThat(expected).as("unexpected key %s", key).containsKey(key); + assertThat(row.getLong(2)).isEqualTo(expected.get(key)); + } + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + createTestTable(table, "CREATE TABLE IF NOT EXISTS %s (a bigint, b bigint, c bigint, PRIMARY KEY (a, b));"); + disableAutoCompaction(table); + + Random random = new Random(0); + long partitionKey = 0; + for (int s = 0; s < NUM_SSTABLES; s++) + { + for (int r = 0; r < NUM_ROWS; r++) + { + for (long clusteringKey = 0; clusteringKey < NUM_COLS; clusteringKey++) + { + long value = random.nextInt(101); + allRows.put(partitionKey + ":" + clusteringKey, value); + execute(String.format("INSERT INTO %s (a, b, c) VALUES (%d, %d, %d);", + table, partitionKey, clusteringKey, value)); + } + partitionKey++; + } + flushKeyspace(table); + } + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java index c6a98ca73..ac03d78ca 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java @@ -36,6 +36,8 @@ import io.vertx.junit5.VertxExtension; import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.CassandraBridgeFactory; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICoordinator; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase; import org.apache.spark.SparkConf; @@ -140,6 +142,53 @@ protected DataFrameWriter bulkWriterDataFrameWriter(Dataset df, Qualif return sparkTestUtils.defaultBulkWriterDataFrameWriter(df, tableName, additionalOptions); } + /** + * Flush the memtable for {@code keyspace} on every node in the cluster. + */ + protected void flushKeyspace(String keyspace) + { + cluster.stream().forEach(instance -> instance.flush(keyspace)); + } + + /** + * Flush the memtable for the keyspace containing {@code table} on every node in the cluster. + */ + protected void flushKeyspace(QualifiedName table) + { + flushKeyspace(table.keyspace()); + } + + /** + * Disable auto-compaction for {@code keyspace} on every node. Useful for tests that + * intentionally produce multiple SSTables via per-batch flushes — without this, background + * compaction can merge them between the last flush and the bulk reader's snapshot, silently + * degrading a multi-SSTable merge test to a single-SSTable read. + */ + protected void disableAutoCompaction(String keyspace) + { + cluster.stream().forEach(instance -> + instance.nodetoolResult("disableautocompaction", keyspace).asserts().success()); + } + + /** + * Disable auto-compaction for the keyspace containing {@code table} on every node. + */ + protected void disableAutoCompaction(QualifiedName table) + { + disableAutoCompaction(table.keyspace()); + } + + /** + * Execute a CQL statement on the first running instance at consistency level {@link + * ConsistencyLevel#ALL}. Use for test-setup writes where every replica must observe the data + * before the bulk reader runs. + */ + protected void execute(String query) + { + ICoordinator coordinator = cluster.getFirstRunningInstance().coordinator(); + coordinator.execute(query, ConsistencyLevel.ALL); + } + protected SparkConf getOrCreateSparkConf() { if (sparkConf == null) diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/StaticColumnSharedValueReadTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/StaticColumnSharedValueReadTest.java new file mode 100644 index 000000000..235350b98 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/StaticColumnSharedValueReadTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Verifies the bulk reader correctly surfaces the customer-facing STATIC column semantic — a + * static cell is partition-scoped, so every clustering row of the partition reads back the same + * static value, and last-write-wins applies across the partition's static cell. + * + *

This test writes a fresh random static on every insert; the read-side expected value is + * therefore the last static written for each partition. Cassandra's LWW resolution on + * the static cell is what surfaces that value on read; if LWW for statics regressed, every + * clustering row of the partition would still report the same static (because static cells are + * partition-scoped), but it would no longer equal the last-written value. + * + *

{@code BulkReaderTest.testReadNullStaticColumn} exercises STATIC schema and null-handling + * but uses one clustering row per partition, so the shared-across-clustering-rows behaviour + * isn't exercised there. This test fills that gap by writing multiple clustering rows per + * partition and asserting all of them read back the partition's last-written static value. + */ +class StaticColumnSharedValueReadTest extends SharedClusterSparkIntegrationTestBase +{ + static final int NUM_PARTITIONS = 10; + static final int NUM_CLUSTERING_ROWS = 20; + + QualifiedName table = uniqueTestTableFullName(TEST_KEYSPACE, "static_shared"); + + /** partition key -> the static value every clustering row of that partition must read back */ + Map expectedStaticByPartition = new HashMap<>(); + /** (partition, clustering) -> the per-row regular column value */ + Map expectedRegularValue = new HashMap<>(); + + @Test + void testStaticValueSharedAcrossClusteringRows() + { + Dataset data = bulkReaderDataFrame(table).load(); + assertThat(data.count()).isEqualTo((long) NUM_PARTITIONS * NUM_CLUSTERING_ROWS); + + // Sanity: every clustering row of partition P must report the same static value. + Map staticPerPartitionFromRead = new HashMap<>(); + for (Row row : data.collectAsList()) + { + long partitionKey = row.getLong(0); + long clusteringKey = row.getLong(1); + long staticValue = row.getLong(2); + long regularValue = row.getLong(3); + + assertThat(expectedStaticByPartition).as("unknown partition %d", partitionKey).containsKey(partitionKey); + assertThat(staticValue) + .as("partition %d clustering %d: STATIC value mismatch", partitionKey, clusteringKey) + .isEqualTo(expectedStaticByPartition.get(partitionKey)); + + String key = partitionKey + ":" + clusteringKey; + assertThat(expectedRegularValue).containsKey(key); + assertThat(regularValue).isEqualTo(expectedRegularValue.get(key)); + + Long previouslySeen = staticPerPartitionFromRead.put(partitionKey, staticValue); + if (previouslySeen != null) + { + assertThat(staticValue) + .as("partition %d returned divergent static values across clustering rows", partitionKey) + .isEqualTo(previouslySeen); + } + } + + // Every partition must have been seen at least once on the read side. + assertThat(staticPerPartitionFromRead.keySet()).isEqualTo(expectedStaticByPartition.keySet()); + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + createTestTable(table, "CREATE TABLE IF NOT EXISTS %s (a bigint, b bigint, c bigint static, d bigint, " + + "PRIMARY KEY (a, b));"); + disableAutoCompaction(table); + + // Pick a fresh random static on every insert; the partition's expected static is the + // LAST value written (Cassandra resolves the static cell as LWW). + // Seeded RNG for reproducibility on failure. + Random random = new Random(0); + for (long partitionKey = 0; partitionKey < NUM_PARTITIONS; partitionKey++) + { + for (long clusteringKey = 0; clusteringKey < NUM_CLUSTERING_ROWS; clusteringKey++) + { + long staticValue = Math.abs(random.nextLong()) % 100_000_000L; + long regularValue = partitionKey * 1000 + clusteringKey; + expectedRegularValue.put(partitionKey + ":" + clusteringKey, regularValue); + expectedStaticByPartition.put(partitionKey, staticValue); + execute(String.format("INSERT INTO %s (a, b, c, d) VALUES (%d, %d, %d, %d);", + table, partitionKey, clusteringKey, staticValue, regularValue)); + } + } + flushKeyspace(table); + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SumAggregationReadTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SumAggregationReadTest.java new file mode 100644 index 000000000..c23c3fb0c --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SumAggregationReadTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.Random; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.apache.spark.sql.functions.sum; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Bulk reads a multi-SSTable table and verifies {@code SUM(c)} via Spark matches the sum of + * values written via CQL, exercising the bulk reader's multi-SSTable merge path against a + * Spark aggregation. + */ +class SumAggregationReadTest extends SharedClusterSparkIntegrationTestBase +{ + static final int NUM_SSTABLES = 5; + static final int NUM_ROWS = 5; + static final int NUM_COLS = 4; + + QualifiedName table = uniqueTestTableFullName(TEST_KEYSPACE, "sum_agg"); + long expectedSum; + + @Test + void testSumMatches() + { + Dataset data = bulkReaderDataFrame(table).load(); + Row row = data.agg(sum("c").alias("sum_c")).collectAsList().get(0); + assertThat(row.getLong(0)).isEqualTo(expectedSum); + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + createTestTable(table, "CREATE TABLE IF NOT EXISTS %s (a bigint, b bigint, c bigint, PRIMARY KEY (a, b));"); + disableAutoCompaction(table); + + Random random = new Random(0); + long partitionKey = 0; + long sum = 0; + for (int s = 0; s < NUM_SSTABLES; s++) + { + for (int r = 0; r < NUM_ROWS; r++) + { + for (long clusteringKey = 0; clusteringKey < NUM_COLS; clusteringKey++) + { + long value = random.nextInt(101); + sum += value; + execute(String.format("INSERT INTO %s (a, b, c) VALUES (%d, %d, %d);", + table, partitionKey, clusteringKey, value)); + } + partitionKey++; + } + flushKeyspace(table); + } + expectedSum = sum; + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TableUnionReadTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TableUnionReadTest.java new file mode 100644 index 000000000..17c255272 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TableUnionReadTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Loads two tables with the same schema via the bulk reader and verifies Spark's + * {@code Dataset#union} returns the combined rowset. + */ +class TableUnionReadTest extends SharedClusterSparkIntegrationTestBase +{ + QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE, "union1"); + QualifiedName table2 = uniqueTestTableFullName(TEST_KEYSPACE, "union2"); + + static final List VALUES_1 = Arrays.asList( + new Object[]{1L, "a"}, new Object[]{2L, "b"}, new Object[]{3L, "c"}); + static final List VALUES_2 = Arrays.asList( + new Object[]{4L, "d"}, new Object[]{5L, "e"}, new Object[]{6L, "f"}); + + @Test + void testTableUnion() + { + Dataset data1 = bulkReaderDataFrame(table1).load(); + Dataset data2 = bulkReaderDataFrame(table2).load(); + Dataset union = data1.union(data2); + + Map expected = new HashMap<>(); + for (Object[] v : VALUES_1) + { + expected.put((Long) v[0], (String) v[1]); + } + + for (Object[] v : VALUES_2) + { + expected.put((Long) v[0], (String) v[1]); + } + + assertThat(union.count()).isEqualTo(expected.size()); + for (Row row : union.collectAsList()) + { + long key = row.getLong(0); + assertThat(expected).containsKey(key); + assertThat(row.getString(1)).isEqualTo(expected.get(key)); + } + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + String ddl = "CREATE TABLE IF NOT EXISTS %s (pk1 bigint, col1 text, PRIMARY KEY (pk1));"; + createTestTable(table1, ddl); + createTestTable(table2, ddl); + disableAutoCompaction(table1); + + for (Object[] v : VALUES_1) + { + execute(String.format("INSERT INTO %s (pk1, col1) VALUES (%d, '%s');", table1, (long) v[0], v[1])); + } + + for (Object[] v : VALUES_2) + { + execute(String.format("INSERT INTO %s (pk1, col1) VALUES (%d, '%s');", table2, (long) v[0], v[1])); + } + + flushKeyspace(table1); + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TombstonesReadTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TombstonesReadTest.java new file mode 100644 index 000000000..a7032dd17 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/TombstonesReadTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Verifies that the bulk reader correctly filters tombstoned rows in four scenarios — basic + * (no deletes, baseline), partition tombstones, row tombstones, and range tombstones. Each + * test owns a distinct table. + * + *

The basic scenario writes {@code num_rows} inserts at sparse random partition/clustering + * keys (collisions are statistically negligible at the [0, 1e8] range). The partition/row/range + * tombstone scenarios use dense sequential keys to keep the delete-by-specific-clustering-key + * scenarios deterministic. + */ +class TombstonesReadTest extends SharedClusterSparkIntegrationTestBase +{ + static final int NUM_ROWS = 100; + static final int NUM_COLS = 10; + + QualifiedName basicTable = uniqueTestTableFullName(TEST_KEYSPACE, "tomb_basic"); + QualifiedName partitionTombstoneTable = uniqueTestTableFullName(TEST_KEYSPACE, "tomb_part"); + QualifiedName rowTombstoneTable = uniqueTestTableFullName(TEST_KEYSPACE, "tomb_row"); + QualifiedName rangeTombstoneTable = uniqueTestTableFullName(TEST_KEYSPACE, "tomb_range"); + + Map basicValues = new HashMap<>(); + Map partitionValues = new HashMap<>(); + Map rowValues = new HashMap<>(); + Map rangeValues = new HashMap<>(); + + @Test + void testBasic() + { + assertRows(basicTable, basicValues); + } + + @Test + void testPartitionTombstones() + { + assertRows(partitionTombstoneTable, partitionValues); + } + + @Test + void testRowTombstones() + { + assertRows(rowTombstoneTable, rowValues); + } + + @Test + void testRangeTombstones() + { + assertRows(rangeTombstoneTable, rangeValues); + } + + private void assertRows(QualifiedName table, Map expected) + { + Dataset data = bulkReaderDataFrame(table).load(); + assertThat(data.count()).isEqualTo(expected.size()); + for (Row row : data.collectAsList()) + { + String key = row.getLong(0) + ":" + row.getLong(1); + assertThat(expected).as("unexpected key %s in table %s", key, table).containsKey(key); + assertThat(row.getLong(2)).isEqualTo(expected.get(key)); + } + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + String ddl = "CREATE TABLE IF NOT EXISTS %s (a bigint, b bigint, c bigint, PRIMARY KEY (a, b));"; + createTestTable(basicTable, ddl); + createTestTable(partitionTombstoneTable, ddl); + createTestTable(rowTombstoneTable, ddl); + createTestTable(rangeTombstoneTable, ddl); + disableAutoCompaction(basicTable); + + Random random = new Random(0); + AtomicInteger seedCounter = new AtomicInteger(); + + // Basic: sparse random partition+clustering keys in a single loop. NUM_ROWS * NUM_COLS + // chosen to match the volume of the other scenarios. + for (int i = 0; i < NUM_ROWS * NUM_COLS; i++) + { + long partitionKey = Math.abs(random.nextLong()) % 100_000_000L; + long clusteringKey = Math.abs(random.nextLong()) % 100_000_000L; + long value = Math.abs(random.nextLong()) % 100_000_000L; + execute(String.format("INSERT INTO %s (a, b, c) VALUES (%d, %d, %d);", + basicTable, partitionKey, clusteringKey, value)); + basicValues.put(partitionKey + ":" + clusteringKey, value); + } + flushKeyspace(basicTable); + + // Partition tombstones: delete partitions [0, 25) — 25 of 100 partitions + populate(partitionTombstoneTable, partitionValues, new Random(seedCounter.incrementAndGet())); + flushKeyspace(partitionTombstoneTable); + int numPartitionDeletes = 25; + for (long partitionKey = 0; partitionKey < numPartitionDeletes; partitionKey++) + { + execute(String.format("DELETE FROM %s WHERE a = %d;", partitionTombstoneTable, partitionKey)); + for (long clusteringKey = 0; clusteringKey < NUM_COLS; clusteringKey++) + { + partitionValues.remove(partitionKey + ":" + clusteringKey); + } + } + flushKeyspace(partitionTombstoneTable); + + // Row tombstones: delete b=1 from every partition + populate(rowTombstoneTable, rowValues, new Random(seedCounter.incrementAndGet())); + flushKeyspace(rowTombstoneTable); + for (long partitionKey = 0; partitionKey < NUM_ROWS; partitionKey++) + { + execute(String.format("DELETE FROM %s WHERE a = %d AND b = 1;", rowTombstoneTable, partitionKey)); + rowValues.remove(partitionKey + ":1"); + } + flushKeyspace(rowTombstoneTable); + + // Range tombstones: delete b in [r1, r2) for a deterministic range per partition + populate(rangeTombstoneTable, rangeValues, new Random(seedCounter.incrementAndGet())); + flushKeyspace(rangeTombstoneTable); + Random rangeRand = new Random(99); + for (long partitionKey = 0; partitionKey < NUM_ROWS; partitionKey++) + { + int r1 = rangeRand.nextInt(NUM_COLS); + int r2 = rangeRand.nextInt(NUM_COLS); + if (r1 > r2) + { + int tmp = r1; + r1 = r2; + r2 = tmp; + } + + if (r1 == r2) + { + r2 = Math.min(r1 + 1, NUM_COLS); + } + + execute(String.format("DELETE FROM %s WHERE a = %d AND b >= %d AND b < %d;", + rangeTombstoneTable, partitionKey, r1, r2)); + for (int clusteringKey = r1; clusteringKey < r2; clusteringKey++) + { + rangeValues.remove(partitionKey + ":" + clusteringKey); + } + } + flushKeyspace(rangeTombstoneTable); + } + + private void populate(QualifiedName table, Map out, Random random) + { + for (long partitionKey = 0; partitionKey < NUM_ROWS; partitionKey++) + { + for (long clusteringKey = 0; clusteringKey < NUM_COLS; clusteringKey++) + { + long value = Math.abs(random.nextLong()) % 100_000_000L; + execute(String.format("INSERT INTO %s (a, b, c) VALUES (%d, %d, %d);", + table, partitionKey, clusteringKey, value)); + out.put(partitionKey + ":" + clusteringKey, value); + } + } + } +}