Skip to content

Commit 6e4f9c0

Browse files
committed
MLE-26918 Added FilterException to identify filter-related errors
Important for the batch retrier in the Spark connector so it doesn't retry a batch when the filter fails. Improved the code for closing handles as well in BatchWriter.
1 parent fcc5cc0 commit 6e4f9c0

File tree

6 files changed

+94
-33
lines changed

6 files changed

+94
-33
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.marklogic.client.datamovement.DataMovementException;
7+
8+
/**
9+
* Any exception thrown by execution of a {@code DocumentWriteSetFilter} will be wrapped in this exception and
10+
* rethrown by the {@code WriteBatcher}, allowing failure listeners to distinguish filter exceptions from other
11+
* exceptions that may occur during batch processing.
12+
*/
13+
public class FilterException extends DataMovementException {
14+
15+
public FilterException(String message, Throwable cause) {
16+
super(message, cause);
17+
}
18+
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ public DocumentWriteSet apply(Context context) {
5353
}
5454
);
5555

56+
if (logger.isDebugEnabled()) {
57+
logger.debug("Retrieved {} existing hashes for batch of size {}", existingHashes.size(), uris.length);
58+
}
59+
5660
return filterDocuments(context, uri -> existingHashes.get(uri));
5761
} catch (FailedRequestException e) {
5862
String message = "Unable to query for existing incremental write hashes; cause: " + e.getMessage();

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteViewFilter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ public DocumentWriteSet apply(Context context) {
4848
}
4949
);
5050

51+
if (logger.isDebugEnabled()) {
52+
logger.debug("Retrieved {} existing hashes for batch of size {}", existingHashes.size(), uris.length);
53+
}
54+
5155
return filterDocuments(context, uri -> existingHashes.get(uri));
5256
} catch (FailedRequestException e) {
5357
String message = "Unable to query for existing incremental write hashes from view " + getConfig().getSchemaName() + "." + getConfig().getViewName() + "; cause: " + e.getMessage();

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
/*
2-
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
2+
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
33
*/
44
package com.marklogic.client.datamovement.impl;
55

66
import com.marklogic.client.datamovement.DocumentWriteSetFilter;
7+
import com.marklogic.client.datamovement.filter.FilterException;
78
import com.marklogic.client.document.DocumentWriteOperation;
89
import com.marklogic.client.document.DocumentWriteSet;
910
import com.marklogic.client.document.XMLDocumentManager;
11+
import com.marklogic.client.impl.IoUtil;
1012
import com.marklogic.client.io.Format;
1113
import org.slf4j.Logger;
1214
import org.slf4j.LoggerFactory;
@@ -25,29 +27,33 @@ public void run() {
2527
return;
2628
}
2729

28-
try {
29-
logger.trace("Begin write batch {} to forest on host '{}'", batchWriteSet.getBatchNumber(), batchWriteSet.getClient().getHost());
30+
logger.trace("Begin write batch {} to forest on host '{}'", batchWriteSet.getBatchNumber(), batchWriteSet.getClient().getHost());
31+
DocumentWriteSet documentWriteSet = batchWriteSet.getDocumentWriteSet();
3032

31-
DocumentWriteSet documentWriteSet = batchWriteSet.getDocumentWriteSet();
32-
if (filter != null) {
33+
if (filter != null) {
34+
try {
3335
documentWriteSet = filter.apply(batchWriteSet);
34-
if (documentWriteSet == null || documentWriteSet.isEmpty()) {
35-
logger.debug("Filter returned empty write set for batch {}, skipping write", batchWriteSet.getBatchNumber());
36-
closeAllHandles();
37-
return;
38-
}
39-
batchWriteSet.updateWithFilteredDocumentWriteSet(documentWriteSet);
36+
} catch (Exception e) {
37+
closeAllHandles();
38+
String message = String.format("Unable to apply filter to batch %d; cause: %s", batchWriteSet.getBatchNumber(), e.getMessage());
39+
onFailure(new FilterException(message, e));
40+
return;
41+
}
42+
if (documentWriteSet == null || documentWriteSet.isEmpty()) {
43+
closeAllHandles();
44+
logger.debug("Filter returned empty write set for batch {}, skipping write", batchWriteSet.getBatchNumber());
45+
return;
4046
}
47+
batchWriteSet.updateWithFilteredDocumentWriteSet(documentWriteSet);
48+
}
4149

50+
try {
4251
writeDocuments(documentWriteSet);
43-
44-
// This seems like it should be part of a finally block - but it's able to throw an exception. Which implies
45-
// that onFailure() should occur when this fails, which seems odd???
46-
closeAllHandles();
47-
4852
onSuccess();
4953
} catch (Throwable t) {
5054
onFailure(t);
55+
} finally {
56+
closeAllHandles();
5157
}
5258
}
5359

@@ -79,21 +85,21 @@ private void onFailure(Throwable t) {
7985
}
8086
}
8187

82-
private void closeAllHandles() throws Throwable {
83-
Throwable lastThrowable = null;
88+
/**
89+
* This used to throw a Throwable... but it's not clear what a user would ever do with that if a content handle
90+
* cannot be closed. Instead, this has been altered to use closeQuietly.
91+
*/
92+
private void closeAllHandles() {
8493
for (DocumentWriteOperation doc : batchWriteSet.getDocumentWriteSet()) {
85-
try {
86-
if (doc.getContent() instanceof Closeable closeable) {
87-
closeable.close();
88-
}
89-
if (doc.getMetadata() instanceof Closeable closeable) {
90-
closeable.close();
91-
}
92-
} catch (Throwable t) {
93-
logger.error("Error closing all handles in BatchWriter", t);
94-
lastThrowable = t;
94+
if (doc == null) {
95+
continue;
96+
}
97+
if (doc.getContent() instanceof Closeable closeable) {
98+
IoUtil.closeQuietly(closeable);
99+
}
100+
if (doc.getMetadata() instanceof Closeable closeable) {
101+
IoUtil.closeQuietly(closeable);
95102
}
96103
}
97-
if (lastThrowable != null) throw lastThrowable;
98104
}
99105
}

marklogic-client-api/src/main/java/com/marklogic/client/impl/IoUtil.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
/*
2-
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
2+
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
33
*/
44
package com.marklogic.client.impl;
55

6+
import org.slf4j.LoggerFactory;
7+
68
import java.io.ByteArrayOutputStream;
9+
import java.io.Closeable;
710
import java.io.IOException;
811
import java.io.InputStream;
912

@@ -23,4 +26,17 @@ static byte[] streamToBytes(InputStream stream) throws IOException {
2326
buffer.flush();
2427
return buffer.toByteArray();
2528
}
29+
30+
static void closeQuietly(Closeable closeable) {
31+
// Reinvented here as we don't yet have a dependency on a 3rd party library that provides this method, and it's
32+
// not worth bringing in a dependency just for this.
33+
if (closeable != null) {
34+
try {
35+
closeable.close();
36+
} catch (IOException e) {
37+
LoggerFactory.getLogger(IoUtil.class)
38+
.warn("Unexpected exception while closing stream: %s".formatted(e.getMessage()), e);
39+
}
40+
}
41+
}
2642
}

marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package com.marklogic.client.datamovement.filter;
55

66
import com.fasterxml.jackson.databind.node.ObjectNode;
7+
import com.marklogic.client.FailedRequestException;
78
import com.marklogic.client.document.*;
89
import com.marklogic.client.impl.DocumentWriteOperationImpl;
910
import com.marklogic.client.io.*;
@@ -128,10 +129,22 @@ void noRangeIndexForField() {
128129
writeTenDocuments();
129130

130131
assertNotNull(batchFailure.get());
131-
String message = batchFailure.get().getMessage();
132-
assertTrue(message.contains("Unable to query for existing incremental write hashes") && message.contains("XDMP-FIELDRIDXNOTFOUND"),
132+
assertTrue(batchFailure.get() instanceof FilterException,
133+
"If the filter fails, the exception should be wrapped in a FilterException so that WriteBatcher " +
134+
"failure listeners can distinguish the difference between a filter failure and a write failure. " +
135+
"If the filter fails, there is likely no reason to retry the request. Actual exception class: " + batchFailure.get().getClass());
136+
137+
FilterException filterException = (FilterException) batchFailure.get();
138+
String message = filterException.getMessage();
139+
assertTrue(message.startsWith("Unable to apply filter to batch 1; cause: "),
140+
"The filter exception message should provide context that the error happened when a filter was applied. " +
141+
"Actual message: " + message);
142+
143+
assertTrue(filterException.getCause() instanceof FailedRequestException);
144+
String causeMessage = filterException.getCause().getMessage();
145+
assertTrue(causeMessage.contains("Unable to query for existing incremental write hashes") && causeMessage.contains("XDMP-FIELDRIDXNOTFOUND"),
133146
"When the user tries to use the incremental write feature without the required range index, we should " +
134-
"fail with a helpful error message. Actual message: " + message);
147+
"fail with a helpful error message. Actual message: " + causeMessage);
135148
}
136149

137150
@Test

0 commit comments

Comments
 (0)