Skip to content

Commit c877942

Browse files
CASSANALYTICS-167: Regenerate bloom filters for CQLSSTableWriter
Patch by Lukasz Antoniak; reviewed by Jon Haddad, Yifan Cai for CASSANALYTICS-167
1 parent a4aa927 commit c877942

10 files changed

Lines changed: 398 additions & 30 deletions

File tree

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
0.5.0
22
-----
3+
* Regenerate bloom filters for CQLSSTableWriter (CASSANALYTICS-167)
34
* Avoid Spark 4 partitioning warnings during bulk reads (CASSANALYTICS-171)
45
* Spark 4.0 Support (CASSANALYTICS-34)
56
* Add IAM credential support for S3 storage transport (CASSANALYTICS-155)

cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java

Lines changed: 59 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,14 @@
4343
import org.apache.cassandra.bridge.SSTableDescriptor;
4444
import org.apache.cassandra.spark.common.Digest;
4545
import org.apache.cassandra.spark.common.SSTables;
46+
import org.apache.cassandra.spark.data.FileSystemSSTable;
4647
import org.apache.cassandra.spark.data.FileType;
4748
import org.apache.cassandra.spark.data.LocalDataLayer;
4849
import org.apache.cassandra.spark.data.partitioner.Partitioner;
4950
import org.apache.cassandra.spark.reader.RowData;
5051
import org.apache.cassandra.spark.reader.StreamScanner;
5152
import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter;
53+
import org.apache.cassandra.spark.stats.BufferingInputStreamStats;
5254
import org.apache.cassandra.spark.utils.DigestAlgorithm;
5355
import org.jetbrains.annotations.NotNull;
5456
import org.jetbrains.annotations.Nullable;
@@ -227,6 +229,9 @@ public synchronized Map<Path, Digest> prepareSStablesToSend(@NotNull BulkWriterC
227229
};
228230
Set<Path> dataFilePaths = new HashSet<>();
229231
Map<Path, Digest> fileDigests = new HashMap<>();
232+
// FIXME: CQLSSTableWriter may produce incomplete Filter.db file, rebuilding it manually (see CASSANDRA-21423).
233+
// rebuild Filter.db files before calculating their digest
234+
rebuildFilterComponents(writerContext, sstableFilter);
230235
try (DirectoryStream<Path> stream = Files.newDirectoryStream(getOutDir(), sstableFilter))
231236
{
232237
for (Path path : stream)
@@ -281,22 +286,43 @@ public synchronized void close(BulkWriterContext writerContext) throws IOExcepti
281286
// Only process new SSTables produced during final flush
282287
DirectoryStream.Filter<Path> unhashedFilter = path -> !hashedFiles.contains(path);
283288

284-
for (Path dataFile : getDataFileStream(unhashedFilter))
289+
// FIXME: CQLSSTableWriter may produce incomplete Filter.db file, rebuilding it manually (see CASSANDRA-21423).
290+
rebuildFilterComponents(writerContext, unhashedFilter);
291+
292+
try (DirectoryStream<Path> dataFileStream = getDataFileStream(unhashedFilter))
285293
{
286-
// NOTE: We calculate file hashes before re-reading so that we know what we hashed
287-
// is what we validated. Then we send these along with the files and the
288-
// receiving end re-hashes the files to make sure they still match.
289-
Map<Path, Digest> newFileDigests = calculateFileDigestMap(dataFile);
290-
overallFileDigests.putAll(newFileDigests);
291-
newlyHashedFiles.addAll(newFileDigests.keySet());
292-
sstableCount += 1;
294+
for (Path dataFile : dataFileStream)
295+
{
296+
// NOTE: We calculate file hashes before re-reading so that we know what we hashed
297+
// is what we validated. Then we send these along with the files and the
298+
// receiving end re-hashes the files to make sure they still match.
299+
Map<Path, Digest> newFileDigests = calculateFileDigestMap(dataFile);
300+
overallFileDigests.putAll(newFileDigests);
301+
newlyHashedFiles.addAll(newFileDigests.keySet());
302+
sstableCount += 1;
303+
}
293304
}
294305
// Only calculate size for newly hashed files, not all files in overallFileDigests
295306
// (previously hashed files may have been deleted by prepareSStablesToSend)
296307
bytesWritten += calculatedTotalSize(newlyHashedFiles);
297308
validateSSTables(writerContext);
298309
}
299310

311+
protected void rebuildFilterComponents(@NotNull BulkWriterContext writerContext,
312+
@NotNull DirectoryStream.Filter<Path> filter) throws IOException
313+
{
314+
LocalDataLayer layer = buildLocalDataLayer(writerContext, getOutDir(), null);
315+
try (DirectoryStream<Path> dataFileStream = getDataFileStream(filter))
316+
{
317+
for (Path dataFile : dataFileStream)
318+
{
319+
FileSystemSSTable ssTable = new FileSystemSSTable(dataFile, false, BufferingInputStreamStats::doNothingStats);
320+
writerContext.bridge().rebuildBloomFilter(layer.partitioner(), layer.cqlTable(), ssTable, getOutDir());
321+
LOGGER.debug("Rebuilt bloom filter for sstable {}", dataFile);
322+
}
323+
}
324+
}
325+
300326
@VisibleForTesting
301327
public void validateSSTables(@NotNull BulkWriterContext writerContext)
302328
{
@@ -319,26 +345,7 @@ public void validateSSTables(@NotNull BulkWriterContext writerContext, @NotNull
319345
// and then validate all of them in parallel threads
320346
try
321347
{
322-
CassandraVersion version = CassandraBridgeFactory.getCassandraVersion(writerContext.cluster().getLowestCassandraVersion());
323-
String keyspace = writerContext.job().qualifiedTableName().keyspace();
324-
String schema = writerContext.schema().getTableSchema().createStatement;
325-
Partitioner partitioner = writerContext.cluster().getPartitioner();
326-
Set<String> udtStatements = writerContext.schema().getUserDefinedTypeStatements();
327-
LocalDataLayer layer = new LocalDataLayer(version,
328-
partitioner,
329-
keyspace,
330-
schema,
331-
udtStatements,
332-
Collections.emptyList() /* requestedFeatures */,
333-
false /* useSSTableInputStream */,
334-
null /* statsClass */,
335-
SSTableTimeRangeFilter.ALL,
336-
outputDirectory.toString());
337-
if (dataFilePaths != null)
338-
{
339-
layer.setDataFilePaths(dataFilePaths);
340-
}
341-
348+
LocalDataLayer layer = buildLocalDataLayer(writerContext, outputDirectory, dataFilePaths);
342349
try (StreamScanner<RowData> scanner = layer.openCompactionScanner(partitionId, Collections.emptyList(), null))
343350
{
344351
while (scanner.next())
@@ -354,6 +361,30 @@ public void validateSSTables(@NotNull BulkWriterContext writerContext, @NotNull
354361
}
355362
}
356363

364+
private LocalDataLayer buildLocalDataLayer(@NotNull BulkWriterContext writerContext, @NotNull Path outputDirectory, @Nullable Set<Path> dataFilePaths)
365+
{
366+
CassandraVersion version = CassandraBridgeFactory.getCassandraVersion(writerContext.cluster().getLowestCassandraVersion());
367+
String keyspace = writerContext.job().qualifiedTableName().keyspace();
368+
String schema = writerContext.schema().getTableSchema().createStatement;
369+
Partitioner partitioner = writerContext.cluster().getPartitioner();
370+
Set<String> udtStatements = writerContext.schema().getUserDefinedTypeStatements();
371+
LocalDataLayer layer = new LocalDataLayer(version,
372+
partitioner,
373+
keyspace,
374+
schema,
375+
udtStatements,
376+
Collections.emptyList() /* requestedFeatures */,
377+
false /* useSSTableInputStream */,
378+
null /* statsClass */,
379+
SSTableTimeRangeFilter.ALL,
380+
outputDirectory.toString());
381+
if (dataFilePaths != null)
382+
{
383+
layer.setDataFilePaths(dataFilePaths);
384+
}
385+
return layer;
386+
}
387+
357388
private DirectoryStream<Path> getDataFileStream(DirectoryStream.Filter<Path> filter) throws IOException
358389
{
359390
// Combine the data file filter with the provided filter

cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@
6969
import static org.apache.cassandra.spark.utils.FilterUtils.parseSSTableTimeRangeFilter;
7070

7171
/**
72-
* Basic DataLayer implementation to read SSTables from local file system. Mostly used for testing.
72+
* Basic DataLayer implementation to read SSTables from local file system.
73+
* Mostly used for testing, but also for validating SSTables during bulk data insertion.
7374
*/
7475
@SuppressWarnings({"unused", "WeakerAccess"})
7576
public class LocalDataLayer extends DataLayer implements Serializable

cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/NonValidatingTestSortedSSTableWriter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.cassandra.spark.bulkwriter;
2121

22+
import java.nio.file.DirectoryStream;
2223
import java.nio.file.Path;
2324

2425
import org.apache.cassandra.spark.utils.DigestAlgorithm;
@@ -36,4 +37,11 @@ public void validateSSTables(@NotNull BulkWriterContext writerContext)
3637
{
3738
// Skip validation for these tests
3839
}
40+
41+
@Override
42+
protected void rebuildFilterComponents(@NotNull BulkWriterContext writerContext,
43+
@NotNull DirectoryStream.Filter<Path> filter)
44+
{
45+
// Skip rebuild for these tests
46+
}
3947
}

cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriterTest.java

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,31 @@
1919

2020
package org.apache.cassandra.spark.bulkwriter;
2121

22+
import java.io.File;
2223
import java.io.IOException;
2324
import java.math.BigInteger;
25+
import java.nio.ByteBuffer;
2426
import java.nio.file.DirectoryStream;
2527
import java.nio.file.Files;
2628
import java.nio.file.Path;
29+
import java.util.AbstractMap;
2730
import java.util.ArrayList;
2831
import java.util.Arrays;
32+
import java.util.Collections;
2933
import java.util.HashSet;
3034
import java.util.List;
3135
import java.util.Map;
3236
import java.util.Set;
37+
import java.util.SortedMap;
38+
import java.util.TreeMap;
3339
import java.util.concurrent.CountDownLatch;
3440
import java.util.concurrent.ExecutorService;
3541
import java.util.concurrent.Executors;
3642
import java.util.concurrent.Future;
3743
import java.util.concurrent.TimeUnit;
3844
import java.util.stream.Collectors;
3945

46+
import com.google.common.collect.ImmutableList;
4047
import com.google.common.collect.ImmutableMap;
4148
import com.google.common.util.concurrent.Uninterruptibles;
4249
import org.junit.jupiter.api.AfterAll;
@@ -45,12 +52,20 @@
4552
import org.junit.jupiter.params.ParameterizedTest;
4653
import org.junit.jupiter.params.provider.MethodSource;
4754

55+
import org.apache.cassandra.bridge.BloomFilter;
56+
import org.apache.cassandra.bridge.CassandraBridge;
57+
import org.apache.cassandra.bridge.CassandraBridgeFactory;
4858
import org.apache.cassandra.bridge.CassandraVersion;
4959
import org.apache.cassandra.bridge.CassandraVersionFeatures;
5060
import org.apache.cassandra.bridge.SSTableDescriptor;
5161
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
5262
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
5363
import org.apache.cassandra.spark.common.Digest;
64+
import org.apache.cassandra.spark.data.CqlTable;
65+
import org.apache.cassandra.spark.data.FileSystemSSTable;
66+
import org.apache.cassandra.spark.data.ReplicationFactor;
67+
import org.apache.cassandra.spark.data.partitioner.Partitioner;
68+
import org.apache.cassandra.spark.stats.BufferingInputStreamStats;
5469
import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm;
5570
import org.jetbrains.annotations.NotNull;
5671

@@ -148,6 +163,122 @@ public void canCreateWriterForVersion(String version) throws IOException
148163
tw.validateSSTables(writerContext, tw.getOutDir(), dataFilePaths);
149164
}
150165

166+
@ParameterizedTest
167+
@MethodSource("supportedVersions")
168+
public void testBloomFilterRebuild(String version) throws IOException
169+
{
170+
int rowCount = 50_000;
171+
CassandraBridge bridge = CassandraBridgeFactory.get(version);
172+
MockBulkWriterContext writerContext = new MockBulkWriterContext(tokenRangeMapping, version, ConsistencyLevel.CL.LOCAL_QUORUM);
173+
Partitioner partitioner = writerContext.getPartitioner();
174+
CqlTable cqlTable = bridge.buildSchema(writerContext.schema().getTableSchema().createStatement,
175+
writerContext.qualifiedTableName().keyspace(),
176+
new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy,
177+
ImmutableMap.of("replication_factor", 1)),
178+
partitioner,
179+
Collections.emptySet());
180+
SortedMap<BigInteger, List<String>> sortedKeys = new TreeMap<>();
181+
for (int i = 0; i < rowCount; ++i)
182+
{
183+
List<String> keys = ImmutableList.of(String.valueOf(i), "1");
184+
AbstractMap.SimpleEntry<ByteBuffer, BigInteger> partitionKey = bridge.getPartitionKey(cqlTable, partitioner, keys);
185+
sortedKeys.put(partitionKey.getValue(), keys);
186+
}
187+
188+
SortedSSTableWriter tw = new SortedSSTableWriter(writerContext, tmpDir, new XXHash32DigestAlgorithm(), 1);
189+
List<SSTableDescriptor> allSSTables = new ArrayList<>();
190+
tw.setSSTablesProducedListener(allSSTables::addAll);
191+
for (BigInteger token : sortedKeys.keySet())
192+
{
193+
List<String> partitionKey = sortedKeys.get(token);
194+
tw.addRow(token,
195+
ImmutableMap.of("id", Integer.parseInt(partitionKey.get(0)),
196+
"date", Integer.parseInt(partitionKey.get(1)),
197+
"course", "foo", "marks", 1));
198+
}
199+
tw.close(writerContext);
200+
201+
assertThat(allSSTables).hasSize(1);
202+
203+
Set<Path> filterFilePaths = new HashSet<>();
204+
try (DirectoryStream<Path> filterFileStream = Files.newDirectoryStream(tw.getOutDir(), "*-Filter.db"))
205+
{
206+
filterFileStream.forEach(filterFilePaths::add);
207+
}
208+
209+
assertThat(filterFilePaths).hasSize(1);
210+
211+
Path filterFile = filterFilePaths.iterator().next();
212+
String dataFileName = filterFile.toFile().getName().replace("-Filter", "-Data");
213+
Path dataFilePath = filterFile.getParent().resolve(dataFileName);
214+
FileSystemSSTable ssTable = new FileSystemSSTable(dataFilePath, false, BufferingInputStreamStats::doNothingStats);
215+
216+
BloomFilter bloomFilter = bridge.openBloomFilter(partitioner,
217+
writerContext.qualifiedTableName().keyspace(),
218+
writerContext.qualifiedTableName().table(),
219+
ssTable);
220+
221+
// second column is always set to 1 when inserting data
222+
List<ByteBuffer> searchKeys = bridge.encodePartitionKeys(partitioner,
223+
writerContext.qualifiedTableName().keyspace(),
224+
writerContext.schema().getTableSchema().createStatement,
225+
ImmutableList.of(ImmutableList.of("1", "1"), ImmutableList.of("7", "2")));
226+
227+
assertThat(bloomFilter.mightContain(searchKeys.get(0))).isTrue();
228+
// Flaky assertion: bloom filters can answer false positive, but since we are using limited data set,
229+
// it is unlikely to happen.
230+
assertThat(bloomFilter.doesNotContain(searchKeys.get(1))).isTrue();
231+
}
232+
233+
@ParameterizedTest
234+
@MethodSource("supportedVersions")
235+
public void testBloomFilterRebuildErrorHandling(String version) throws IOException
236+
{
237+
MockBulkWriterContext writerContext = new MockBulkWriterContext(tokenRangeMapping, version, ConsistencyLevel.CL.LOCAL_QUORUM);
238+
SortedSSTableWriter tw = new SortedSSTableWriter(writerContext, tmpDir, new XXHash32DigestAlgorithm(), 1)
239+
{
240+
protected void rebuildFilterComponents(@NotNull BulkWriterContext writerContext,
241+
@NotNull DirectoryStream.Filter<Path> filter) throws IOException
242+
{
243+
// temporarily move index file to simulate error in bloom filter rebuild process
244+
try (DirectoryStream<Path> indexFileStream = Files.newDirectoryStream(getOutDir(), "*.db"))
245+
{
246+
indexFileStream.forEach(indexFilePath -> {
247+
for (String indexSuffix : Arrays.asList("Partitions.db", "Index.db"))
248+
{
249+
if (indexFilePath.toFile().getName().endsWith(indexSuffix))
250+
{
251+
File indexFile = indexFilePath.toFile();
252+
boolean moved = indexFile.renameTo(new File(indexFile.getAbsolutePath() + "_hidden"));
253+
assertThat(moved).isTrue();
254+
}
255+
}
256+
});
257+
}
258+
super.rebuildFilterComponents(writerContext, filter);
259+
// move the index files back
260+
try (DirectoryStream<Path> hiddenFileStream = Files.newDirectoryStream(getOutDir(), "*_hidden"))
261+
{
262+
hiddenFileStream.forEach(hiddenFilePath -> {
263+
File hiddenFile = hiddenFilePath.toFile();
264+
boolean moved = hiddenFile.renameTo(new File(hiddenFile.getParent(), hiddenFile.getName().replace("_hidden", "")));
265+
assertThat(moved).isTrue();
266+
});
267+
}
268+
}
269+
};
270+
List<SSTableDescriptor> allSSTables = new ArrayList<>();
271+
tw.setSSTablesProducedListener(allSSTables::addAll);
272+
tw.addRow(BigInteger.ONE, ImmutableMap.of("id", 1, "date", 1, "course", "foo", "marks", 1));
273+
tw.close(writerContext);
274+
assertThat(allSSTables).hasSize(1);
275+
// verify that bloom filter was not created
276+
try (DirectoryStream<Path> filterFileStream = Files.newDirectoryStream(tw.getOutDir(), "*Filter.db"))
277+
{
278+
assertThat(filterFileStream.iterator().hasNext()).isFalse();
279+
}
280+
}
281+
151282
/**
152283
* Tests the race condition fix between prepareSStablesToSend (called from background threads)
153284
* and close (called from the main thread). This test exercises CASSANALYTICS-107.

cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.google.common.base.Preconditions;
3232
import com.google.common.collect.ImmutableMap;
3333
import com.google.common.collect.Lists;
34+
import org.apache.commons.lang3.StringUtils;
3435
import org.apache.commons.lang3.tuple.Pair;
3536

3637
import org.apache.cassandra.bridge.CassandraBridge;
@@ -467,7 +468,7 @@ private String getKeyDef()
467468
String clusteringKey = primaryColumns.stream()
468469
.map(this::maybeQuoteIdentifierIfRequested)
469470
.collect(Collectors.joining(","));
470-
return "PRIMARY KEY (" + partitionKey + clusteringKey + ")";
471+
return "PRIMARY KEY (" + partitionKey + (StringUtils.isNotBlank(clusteringKey) ? ("," + clusteringKey) : "") + ")";
471472
}
472473

473474
@Override

0 commit comments

Comments
 (0)