Skip to content

Commit b7ea0eb

Browse files
committed
CASSANALYTICS-32 wip
1 parent 8f56f10 commit b7ea0eb

13 files changed

Lines changed: 61 additions & 14 deletions

File tree

cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public class BulkSparkConf implements Serializable
121121
public final Integer sstableDataSizeInMiB;
122122
public final int commitBatchSize;
123123
public final boolean skipExtendedVerify;
124+
public final boolean skipRowsViolatingConstraints;
124125
public final WriteMode writeMode;
125126
public final int commitThreadsPerInstance;
126127
public final double importCoordinatorTimeoutMultiplier;
@@ -166,6 +167,8 @@ public BulkSparkConf(SparkConf conf, Map<String, String> options)
166167
this.table = MapUtils.getOrThrow(options, WriterOptions.TABLE.name());
167168
this.skipExtendedVerify = MapUtils.getBoolean(options, WriterOptions.SKIP_EXTENDED_VERIFY.name(), true,
168169
"skip extended verification of SSTables by Cassandra");
170+
this.skipRowsViolatingConstraints = MapUtils.getBoolean(options, WriterOptions.SKIP_ROWS_VIOLATING_CONSTRAINTS.name(), false,
171+
"skip rows which failed to be written because of violated constraints");
169172
this.consistencyLevel = ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM"));
170173
String dc = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null);
171174
if (!consistencyLevel.isLocal() && dc != null)

cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,13 +199,14 @@ private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]> sortedRDD, Str
199199

200200
long rowCount = streamResults.stream().mapToLong(res -> res.rowCount).sum();
201201
long totalBytesWritten = streamResults.stream().mapToLong(res -> res.bytesWritten).sum();
202+
long rowsViolatedConstraints = streamResults.stream().mapToLong(res -> res.rowsViolatedConstraints).sum();
202203
boolean hasClusterTopologyChanged = writeResults.stream().anyMatch(WriteResult::isClusterResizeDetected);
203204

204205
onCloudStorageTransport(context -> waitForImportCompletion(context, rowCount, totalBytesWritten, hasClusterTopologyChanged, streamResults));
205206

206207
LOGGER.info("Bulk writer job complete. rowCount={} totalBytes={} hasClusterTopologyChanged={}",
207208
rowCount, totalBytesWritten, hasClusterTopologyChanged);
208-
publishSuccessfulJobStats(rowCount, totalBytesWritten, hasClusterTopologyChanged);
209+
publishSuccessfulJobStats(rowCount, totalBytesWritten, rowsViolatedConstraints, hasClusterTopologyChanged);
209210
}
210211
catch (Throwable throwable)
211212
{
@@ -300,13 +301,14 @@ private void awaitImportCompletion(TransportContext.CloudStorageTransportContext
300301
importCoordinator.await();
301302
}
302303

303-
private void publishSuccessfulJobStats(long rowCount, long totalBytesWritten, boolean hasClusterTopologyChanged)
304+
private void publishSuccessfulJobStats(long rowCount, long rowsViolatingConstraints, long totalBytesWritten, boolean hasClusterTopologyChanged)
304305
{
305306
writerContext.jobStats().publish(new HashMap<String, String>() // type declaration required to compile with java8
306307
{{
307308
put("jobId", writerContext.job().getId().toString());
308309
put("transportInfo", writerContext.job().transportInfo().toString());
309310
put("rowsWritten", Long.toString(rowCount));
311+
put("rowsViolatingConstraints", Long.toString(rowsViolatingConstraints));
310312
put("bytesWritten", Long.toString(totalBytesWritten));
311313
put("jobStatus", "Succeeded");
312314
put("clusterResizeDetected", String.valueOf(hasClusterTopologyChanged));

cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ public boolean skipExtendedVerify()
7676
return conf.skipExtendedVerify;
7777
}
7878

79+
@Override
80+
public boolean skipRowsViolatingConstraints()
81+
{
82+
return conf.skipRowsViolatingConstraints;
83+
}
84+
7985
@Override
8086
public boolean getSkipClean()
8187
{

cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamResult.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ public class DirectStreamResult extends StreamResult
3232

3333
public DirectStreamResult(String sessionID, Range<BigInteger> tokenRange,
3434
List<StreamError> failures, List<RingInstance> passed,
35-
long rowCount, long bytesWritten)
35+
long rowCount, long bytesWritten, long rowsViolatedConstraints)
3636
{
37-
super(sessionID, tokenRange, failures, passed, rowCount, bytesWritten);
37+
super(sessionID, tokenRange, failures, passed, rowCount, bytesWritten, rowsViolatedConstraints);
3838
}
3939

4040
public void setCommitResults(List<CommitResult> commitResult)
@@ -53,6 +53,7 @@ public String toString()
5353
+ ", commitResults=" + commitResults
5454
+ ", passed=" + passed
5555
+ ", bytesWritten=" + bytesWritten
56+
+ ", rowsViolatedConstraints=" + rowsViolatedConstraints
5657
+ '}';
5758
}
5859
}

cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSession.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ protected StreamResult doFinalizeStream()
123123
errors,
124124
new ArrayList<>(replicas),
125125
sstableWriter.rowCount(),
126-
sstableWriter.bytesWritten());
126+
sstableWriter.bytesWritten(),
127+
sstableWriter.rowsViolatedConstraints());
127128
List<CommitResult> cr;
128129
try
129130
{

cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ default String getId()
7070

7171
boolean skipExtendedVerify();
7272

73+
boolean skipRowsViolatingConstraints();
74+
7375
boolean getSkipClean();
7476

7577
/**

cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SortedSSTableWriter.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,14 @@ public class SortedSSTableWriter
7777
private BigInteger maxToken = null;
7878
private final Map<Path, Digest> overallFileDigests = new HashMap<>();
7979
private final DigestAlgorithm digestAlgorithm;
80+
private final boolean skipRowsViolatingConstraints;
8081

8182
private volatile boolean isClosed = false;
8283

8384
private int sstableCount = 0;
8485
private long rowCount = 0;
8586
private long bytesWritten = 0;
87+
private long rowsViolatedConstraints = 0;
8688

8789
public SortedSSTableWriter(org.apache.cassandra.bridge.SSTableWriter tableWriter, Path outDir,
8890
DigestAlgorithm digestAlgorithm,
@@ -92,13 +94,15 @@ public SortedSSTableWriter(org.apache.cassandra.bridge.SSTableWriter tableWriter
9294
this.outDir = outDir;
9395
this.digestAlgorithm = digestAlgorithm;
9496
this.partitionId = partitionId;
97+
this.skipRowsViolatingConstraints = false;
9598
}
9699

97100
public SortedSSTableWriter(BulkWriterContext writerContext, Path outDir, DigestAlgorithm digestAlgorithm, int partitionId)
98101
{
99102
this.outDir = outDir;
100103
this.digestAlgorithm = digestAlgorithm;
101104
this.partitionId = partitionId;
105+
this.skipRowsViolatingConstraints = writerContext.job().skipRowsViolatingConstraints();
102106

103107
String lowestCassandraVersion = writerContext.cluster().getLowestCassandraVersion();
104108
String packageVersion = getPackageVersion(lowestCassandraVersion);
@@ -137,8 +141,24 @@ public void addRow(BigInteger token, Map<String, Object> boundValues) throws IOE
137141
}
138142
// rows are sorted. Therefore, only update the maxToken
139143
maxToken = token;
140-
cqlSSTableWriter.addRow(boundValues);
141-
rowCount += 1;
144+
try
145+
{
146+
cqlSSTableWriter.addRow(boundValues);
147+
rowCount += 1;
148+
}
149+
catch (Throwable t)
150+
{
151+
if (t.getCause() != null && t.getCause().getClass().getName().equals("org.apache.cassandra.cql3.constraints.ConstraintViolationException"))
152+
{
153+
rowsViolatedConstraints += 1;
154+
if (!skipRowsViolatingConstraints)
155+
throw t;
156+
}
157+
else
158+
{
159+
throw t;
160+
}
161+
}
142162
}
143163

144164
public void setSSTablesProducedListener(Consumer<Set<SSTableDescriptor>> listener)
@@ -324,4 +344,9 @@ public Map<Path, Digest> fileDigestMap()
324344
{
325345
return Collections.unmodifiableMap(overallFileDigests);
326346
}
347+
348+
public long rowsViolatedConstraints()
349+
{
350+
return rowsViolatedConstraints;
351+
}
327352
}

cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamResult.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,22 @@ public abstract class StreamResult implements Serializable
3434
public final List<RingInstance> passed;
3535
public final long rowCount;
3636
public final long bytesWritten;
37+
public final long rowsViolatedConstraints;
3738

3839
protected StreamResult(String sessionID,
3940
Range<BigInteger> tokenRange,
4041
List<StreamError> failures,
4142
List<RingInstance> passed,
4243
long rowCount,
43-
long bytesWritten)
44+
long bytesWritten,
45+
long rowsViolatedConstraints)
4446
{
4547
this.sessionID = sessionID;
4648
this.tokenRange = tokenRange;
4749
this.failures = failures;
4850
this.passed = passed;
4951
this.rowCount = rowCount;
5052
this.bytesWritten = bytesWritten;
53+
this.rowsViolatedConstraints = rowsViolatedConstraints;
5154
}
5255
}

cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public enum WriterOptions implements WriterOption
4545
COMMIT_THREADS_PER_INSTANCE,
4646
COMMIT_BATCH_SIZE,
4747
SKIP_EXTENDED_VERIFY,
48+
SKIP_ROWS_VIOLATING_CONSTRAINTS,
4849
WRITE_MODE,
4950
KEYSTORE_PASSWORD,
5051
KEYSTORE_PATH,

cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/CloudStorageStreamResult.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class CloudStorageStreamResult extends StreamResult
4444

4545
public static CloudStorageStreamResult empty(String sessionID, Range<BigInteger> tokenRange)
4646
{
47-
return new CloudStorageStreamResult(sessionID, tokenRange, new ArrayList<>(), new ArrayList<>(), new HashSet<>(), 0, 0, 0);
47+
return new CloudStorageStreamResult(sessionID, tokenRange, new ArrayList<>(), new ArrayList<>(), new HashSet<>(), 0, 0, 0, 0);
4848
}
4949

5050
public CloudStorageStreamResult(String sessionID,
@@ -54,9 +54,10 @@ public CloudStorageStreamResult(String sessionID,
5454
Set<CreatedRestoreSlice> createdRestoreSlices,
5555
int objectCount,
5656
long rowCount,
57-
long bytesWritten)
57+
long bytesWritten,
58+
long rowsViolatedConstraints)
5859
{
59-
super(sessionID, tokenRange, failures, passed, rowCount, bytesWritten);
60+
super(sessionID, tokenRange, failures, passed, rowCount, bytesWritten, rowsViolatedConstraints);
6061
this.createdRestoreSlices = Collections.unmodifiableSet(createdRestoreSlices);
6162
this.objectCount = objectCount;
6263
}
@@ -70,6 +71,7 @@ public String toString()
7071
+ ", objectCount=" + objectCount
7172
+ ", rowCount=" + rowCount
7273
+ ", bytesWritten=" + bytesWritten
74+
+ ", rowsViolatedConstraints=" + rowsViolatedConstraints
7375
+ ", failures=" + failures
7476
+ ", createdRestoreSlices=" + createdRestoreSlices
7577
+ ", passed=" + passed

0 commit comments

Comments
 (0)