Skip to content

Commit 130ba68

Browse files
committed
MLE-26918 Added fromView support for incremental write
The plan is to dump eval support and just offer fromLexicons and fromView, but need to test out fromView a bit first. Did some refactoring too because the constructors had gotten so ugly - there's now an IncrementalWriteConfig class that holds all the inputs from the Builder, so that filter constructors only need that as an arg.
1 parent 06d2f80 commit 130ba68

File tree

7 files changed

+271
-39
lines changed

7 files changed

+271
-39
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.marklogic.client.document.DocumentWriteOperation;
7+
8+
import java.util.Collections;
9+
import java.util.Map;
10+
import java.util.function.Consumer;
11+
12+
/**
13+
* Configuration for incremental write filtering.
14+
*
15+
* @since 8.1.0
16+
*/
17+
public class IncrementalWriteConfig {
18+
19+
private final String hashKeyName;
20+
private final String timestampKeyName;
21+
private final boolean canonicalizeJson;
22+
private final Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
23+
private final String[] jsonExclusions;
24+
private final String[] xmlExclusions;
25+
private final Map<String, String> xmlNamespaces;
26+
private final String schemaName;
27+
private final String viewName;
28+
29+
public IncrementalWriteConfig(String hashKeyName, String timestampKeyName, boolean canonicalizeJson,
30+
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer,
31+
String[] jsonExclusions, String[] xmlExclusions, Map<String, String> xmlNamespaces,
32+
String schemaName, String viewName) {
33+
this.hashKeyName = hashKeyName;
34+
this.timestampKeyName = timestampKeyName;
35+
this.canonicalizeJson = canonicalizeJson;
36+
this.skippedDocumentsConsumer = skippedDocumentsConsumer;
37+
this.jsonExclusions = jsonExclusions;
38+
this.xmlExclusions = xmlExclusions;
39+
this.xmlNamespaces = xmlNamespaces != null ? Collections.unmodifiableMap(xmlNamespaces) : null;
40+
this.schemaName = schemaName;
41+
this.viewName = viewName;
42+
}
43+
44+
public String getHashKeyName() {
45+
return hashKeyName;
46+
}
47+
48+
public String getTimestampKeyName() {
49+
return timestampKeyName;
50+
}
51+
52+
public boolean isCanonicalizeJson() {
53+
return canonicalizeJson;
54+
}
55+
56+
public Consumer<DocumentWriteOperation[]> getSkippedDocumentsConsumer() {
57+
return skippedDocumentsConsumer;
58+
}
59+
60+
public String[] getJsonExclusions() {
61+
return jsonExclusions;
62+
}
63+
64+
public String[] getXmlExclusions() {
65+
return xmlExclusions;
66+
}
67+
68+
public Map<String, String> getXmlNamespaces() {
69+
return xmlNamespaces != null ? xmlNamespaces : Collections.emptyMap();
70+
}
71+
72+
public String getSchemaName() {
73+
return schemaName;
74+
}
75+
76+
public String getViewName() {
77+
return viewName;
78+
}
79+
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,6 @@
1212
import com.marklogic.client.document.DocumentWriteSet;
1313
import com.marklogic.client.io.JacksonHandle;
1414

15-
import java.util.Map;
16-
import java.util.function.Consumer;
17-
1815
/**
1916
* Uses server-side JavaScript code to get the existing hash values for a set of URIs.
2017
*
@@ -31,9 +28,8 @@ class IncrementalWriteEvalFilter extends IncrementalWriteFilter {
3128
response
3229
""";
3330

34-
IncrementalWriteEvalFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson,
35-
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer, String[] jsonExclusions, String[] xmlExclusions, Map<String, String> xmlNamespaces) {
36-
super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces);
31+
IncrementalWriteEvalFilter(IncrementalWriteConfig config) {
32+
super(config);
3733
}
3834

3935
@Override
@@ -47,7 +43,7 @@ public DocumentWriteSet apply(DocumentWriteSetFilter.Context context) {
4743

4844
try {
4945
JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT)
50-
.addVariable("hashKeyName", hashKeyName)
46+
.addVariable("hashKeyName", getConfig().getHashKeyName())
5147
.addVariable("uris", new JacksonHandle(uris))
5248
.evalAs(JsonNode.class);
5349

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java

Lines changed: 53 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public static class Builder {
5353
private String[] jsonExclusions;
5454
private String[] xmlExclusions;
5555
private Map<String, String> xmlNamespaces;
56+
private String schemaName;
57+
private String viewName;
5658

5759
/**
5860
* @param keyName the name of the MarkLogic metadata key that will hold the hash value; defaults to "incrementalWriteHash".
@@ -128,13 +130,43 @@ public Builder xmlNamespaces(Map<String, String> namespaces) {
128130
return this;
129131
}
130132

133+
/**
134+
* Configures the filter to use a TDE view for retrieving hash values instead of field range indexes.
135+
* This approach requires a TDE template to be deployed that extracts the URI and hash metadata.
136+
*
137+
* @param schemaName the schema name of the TDE view
138+
* @param viewName the view name of the TDE view
139+
* @return this builder
140+
*/
141+
public Builder fromView(String schemaName, String viewName) {
142+
boolean schemaEmpty = schemaName == null || schemaName.trim().isEmpty();
143+
boolean viewEmpty = viewName == null || viewName.trim().isEmpty();
144+
145+
if (schemaEmpty && !viewEmpty) {
146+
throw new IllegalArgumentException("Schema name cannot be null or empty when view name is provided");
147+
}
148+
if (!schemaEmpty && viewEmpty) {
149+
throw new IllegalArgumentException("View name cannot be null or empty when schema name is provided");
150+
}
151+
152+
this.schemaName = schemaName;
153+
this.viewName = viewName;
154+
return this;
155+
}
156+
131157
public IncrementalWriteFilter build() {
132158
validateJsonExclusions();
133159
validateXmlExclusions();
160+
IncrementalWriteConfig config = new IncrementalWriteConfig(hashKeyName, timestampKeyName, canonicalizeJson,
161+
skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces, schemaName, viewName);
162+
163+
if (schemaName != null && viewName != null) {
164+
return new IncrementalWriteViewFilter(config);
165+
}
134166
if (useEvalQuery) {
135-
return new IncrementalWriteEvalFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces);
167+
return new IncrementalWriteEvalFilter(config);
136168
}
137-
return new IncrementalWriteOpticFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces);
169+
return new IncrementalWriteOpticFilter(config);
138170
}
139171

140172
private void validateJsonExclusions() {
@@ -181,26 +213,18 @@ private void validateXmlExclusions() {
181213
}
182214
}
183215

184-
protected final String hashKeyName;
185-
private final String timestampKeyName;
186-
private final boolean canonicalizeJson;
187-
private final Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
188-
private final String[] jsonExclusions;
189-
private final String[] xmlExclusions;
190-
private final Map<String, String> xmlNamespaces;
216+
private final IncrementalWriteConfig config;
191217

192218
// Hardcoding this for now, with a good general purpose hashing function.
193219
// See https://xxhash.com for benchmarks.
194220
private final LongHashFunction hashFunction = LongHashFunction.xx3();
195221

196-
public IncrementalWriteFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson, Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer, String[] jsonExclusions, String[] xmlExclusions, Map<String, String> xmlNamespaces) {
197-
this.hashKeyName = hashKeyName;
198-
this.timestampKeyName = timestampKeyName;
199-
this.canonicalizeJson = canonicalizeJson;
200-
this.skippedDocumentsConsumer = skippedDocumentsConsumer;
201-
this.jsonExclusions = jsonExclusions;
202-
this.xmlExclusions = xmlExclusions;
203-
this.xmlNamespaces = xmlNamespaces;
222+
public IncrementalWriteFilter(IncrementalWriteConfig config) {
223+
this.config = config;
224+
}
225+
226+
public IncrementalWriteConfig getConfig() {
227+
return config;
204228
}
205229

206230
protected final DocumentWriteSet filterDocuments(Context context, Function<String, String> hashRetriever) {
@@ -230,19 +254,19 @@ protected final DocumentWriteSet filterDocuments(Context context, Function<Strin
230254

231255
if (existingHash != null) {
232256
if (!existingHash.equals(contentHash)) {
233-
newWriteSet.add(addHashToMetadata(doc, hashKeyName, contentHash, timestampKeyName, timestamp));
234-
} else if (skippedDocumentsConsumer != null) {
257+
newWriteSet.add(addHashToMetadata(doc, config.getHashKeyName(), contentHash, config.getTimestampKeyName(), timestamp));
258+
} else if (config.getSkippedDocumentsConsumer() != null) {
235259
skippedDocuments.add(doc);
236260
} else {
237261
// No consumer, so skip the document silently.
238262
}
239263
} else {
240-
newWriteSet.add(addHashToMetadata(doc, hashKeyName, contentHash, timestampKeyName, timestamp));
264+
newWriteSet.add(addHashToMetadata(doc, config.getHashKeyName(), contentHash, config.getTimestampKeyName(), timestamp));
241265
}
242266
}
243267

244-
if (!skippedDocuments.isEmpty() && skippedDocumentsConsumer != null) {
245-
skippedDocumentsConsumer.accept(skippedDocuments.toArray(new DocumentWriteOperation[0]));
268+
if (!skippedDocuments.isEmpty() && config.getSkippedDocumentsConsumer() != null) {
269+
config.getSkippedDocumentsConsumer().accept(skippedDocuments.toArray(new DocumentWriteOperation[0]));
246270
}
247271

248272
return newWriteSet;
@@ -259,11 +283,11 @@ private String serializeContent(DocumentWriteOperation doc) {
259283
format = baseHandle.getFormat();
260284
}
261285

262-
if (canonicalizeJson && (Format.JSON.equals(format) || isPossiblyJsonContent(content))) {
286+
if (config.isCanonicalizeJson() && (Format.JSON.equals(format) || isPossiblyJsonContent(content))) {
263287
JsonCanonicalizer jc;
264288
try {
265-
if (jsonExclusions != null && jsonExclusions.length > 0) {
266-
content = ContentExclusionUtil.applyJsonExclusions(doc.getUri(), content, jsonExclusions);
289+
if (config.getJsonExclusions() != null && config.getJsonExclusions().length > 0) {
290+
content = ContentExclusionUtil.applyJsonExclusions(doc.getUri(), content, config.getJsonExclusions());
267291
}
268292
jc = new JsonCanonicalizer(content);
269293
return jc.getEncodedString();
@@ -274,9 +298,9 @@ private String serializeContent(DocumentWriteOperation doc) {
274298
logger.warn("Unable to canonicalize JSON content for URI {}, using original content for hashing; cause: {}",
275299
doc.getUri(), e.getMessage());
276300
}
277-
} else if (xmlExclusions != null && xmlExclusions.length > 0) {
301+
} else if (config.getXmlExclusions() != null && config.getXmlExclusions().length > 0) {
278302
try {
279-
content = ContentExclusionUtil.applyXmlExclusions(doc.getUri(), content, xmlNamespaces, xmlExclusions);
303+
content = ContentExclusionUtil.applyXmlExclusions(doc.getUri(), content, config.getXmlNamespaces(), config.getXmlExclusions());
280304
} catch (Exception e) {
281305
logger.warn("Unable to apply XML exclusions for URI {}, using original content for hashing; cause: {}",
282306
doc.getUri(), e.getMessage());
@@ -316,4 +340,6 @@ protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation
316340

317341
return new DocumentWriteOperationImpl(op.getUri(), newMetadata, op.getContent(), op.getTemporalDocumentURI());
318342
}
343+
344+
319345
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
import java.util.HashMap;
1212
import java.util.Map;
13-
import java.util.function.Consumer;
1413

1514
/**
1615
* Uses an Optic query to get the existing hash values for a set of URIs.
@@ -19,9 +18,8 @@
1918
*/
2019
class IncrementalWriteOpticFilter extends IncrementalWriteFilter {
2120

22-
IncrementalWriteOpticFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson,
23-
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer, String[] jsonExclusions, String[] xmlExclusions, Map<String, String> xmlNamespaces) {
24-
super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces);
21+
IncrementalWriteOpticFilter(IncrementalWriteConfig config) {
22+
super(config);
2523
}
2624

2725
@Override
@@ -39,7 +37,7 @@ public DocumentWriteSet apply(Context context) {
3937
Map<String, String> existingHashes = rowTemplate.query(op ->
4038
op.fromLexicons(Map.of(
4139
"uri", op.cts.uriReference(),
42-
"hash", op.cts.fieldReference(super.hashKeyName)
40+
"hash", op.cts.fieldReference(getConfig().getHashKeyName())
4341
)).where(
4442
op.cts.documentQuery(op.xs.stringSeq(uris))
4543
),
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.marklogic.client.FailedRequestException;
7+
import com.marklogic.client.document.DocumentWriteOperation;
8+
import com.marklogic.client.document.DocumentWriteSet;
9+
import com.marklogic.client.row.RowTemplate;
10+
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
14+
/**
15+
* Uses an Optic query with fromView to get the existing hash values for a set of URIs from a TDE view.
16+
* This implementation requires a TDE template to be deployed that extracts the URI and hash metadata.
17+
*
18+
* @since 8.1.0
19+
*/
20+
class IncrementalWriteViewFilter extends IncrementalWriteFilter {
21+
22+
IncrementalWriteViewFilter(IncrementalWriteConfig config) {
23+
super(config);
24+
}
25+
26+
@Override
27+
public DocumentWriteSet apply(Context context) {
28+
final String[] uris = context.getDocumentWriteSet().stream()
29+
.filter(op -> DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType()))
30+
.map(DocumentWriteOperation::getUri)
31+
.toArray(String[]::new);
32+
33+
RowTemplate rowTemplate = new RowTemplate(context.getDatabaseClient());
34+
35+
try {
36+
Map<String, String> existingHashes = rowTemplate.query(op ->
37+
op.fromView(getConfig().getSchemaName(), getConfig().getViewName())
38+
.where(op.in(op.col("uri"), op.xs.stringSeq(uris))),
39+
40+
rows -> {
41+
Map<String, String> map = new HashMap<>();
42+
rows.forEach(row -> {
43+
String uri = row.getString("uri");
44+
String existingHash = row.getString("hash");
45+
map.put(uri, existingHash);
46+
});
47+
return map;
48+
}
49+
);
50+
51+
return filterDocuments(context, uri -> existingHashes.get(uri));
52+
} catch (FailedRequestException e) {
53+
String message = "Unable to query for existing incremental write hashes from view " + getConfig().getSchemaName() + "." + getConfig().getViewName() + "; cause: " + e.getMessage();
54+
throw new FailedRequestException(message, e.getFailedRequest());
55+
}
56+
}
57+
}

0 commit comments

Comments
 (0)