Skip to content

Commit f5e341f

Browse files
committed
add UUID as new entry value in the add_entries processor
Signed-off-by: Xun Zhang <xunzh@amazon.com>
1 parent d875a66 commit f5e341f

3 files changed

Lines changed: 144 additions & 4 deletions

File tree

data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.List;
3131
import java.util.Map;
3232
import java.util.Objects;
33+
import java.util.UUID;
3334
import java.util.function.Consumer;
3435
import java.util.function.Supplier;
3536

@@ -313,8 +314,10 @@ private Object retrieveValue(final AddEntryProcessorConfig.Entry entry, final Ev
313314
int entryIndex = entries.indexOf(entry);
314315
EntryProperties props = entryProperties.get(entryIndex);
315316
KeyInfo keyInfo = preprocessedKeys.get(entryIndex);
316-
317-
if (!Objects.isNull(entry.getValueExpression())) {
317+
318+
if (entry.getGenerateUuid()) {
319+
return UUID.randomUUID().toString();
320+
} else if (!Objects.isNull(entry.getValueExpression())) {
318321
value = expressionEvaluator.evaluate(entry.getValueExpression(), context);
319322
} else if (!Objects.isNull(entry.getFormat())) {
320323
try {

data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorConfig.java

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,12 @@ public static class Entry {
166166
})
167167
private boolean flattenKey = true;
168168

169+
@JsonProperty("generate_uuid")
170+
@JsonPropertyDescription("When set to <code>true</code>, generates a random UUID (version 4) as the value of the new entry. " +
171+
"Each event receives its own unique UUID, providing globally unique identifiers across distributed deployments " +
172+
"without any coordination between nodes. Cannot be used together with <code>value</code>, <code>format</code>, or <code>value_expression</code>.")
173+
private boolean generateUuid = false;
174+
169175
@JsonProperty("add_when")
170176
@JsonPropertyDescription("A <a href=\"https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/\">conditional expression</a>, " +
171177
"such as <code>/some-key == \"test\"</code>, that will be evaluated to determine whether the processor will be run on the event.")
@@ -212,11 +218,19 @@ public boolean getFlattenKey(){
212218
return flattenKey;
213219
}
214220

221+
public boolean getGenerateUuid() {
222+
return generateUuid;
223+
}
224+
215225
public String getAddWhen() { return addWhen; }
216226

217-
@AssertTrue(message = "Either value or format or expression must be specified, and only one of them can be specified")
227+
@AssertTrue(message = "Exactly one of value, format, value_expression, or generate_uuid must be specified")
218228
public boolean hasValueOrFormatOrExpression() {
219-
return Stream.of(value, format, valueExpression).filter(n -> n!=null).count() == 1;
229+
final long count = Stream.of(value, format, valueExpression).filter(n -> n != null).count();
230+
if (generateUuid) {
231+
return count == 0;
232+
}
233+
return count == 1;
220234
}
221235

222236
@AssertTrue(message = "overwrite_if_key_exists and append_if_key_exists can not be set to true at the same time.")
@@ -229,6 +243,46 @@ boolean flattenKeyFalseIsUsedWithIterateOn() {
229243
return (!flattenKey && iterateOn!=null) || flattenKey;
230244
}
231245

246+
public Entry(final String key,
247+
final String metadataKey,
248+
final Object value,
249+
final String format,
250+
final String valueExpression,
251+
final boolean overwriteIfKeyExists,
252+
final boolean appendIfKeyExists,
253+
final String addWhen,
254+
final String iterateOn,
255+
final boolean flattenKey,
256+
final String addToElementWhen,
257+
final boolean generateUuid)
258+
{
259+
if (key != null && metadataKey != null) {
260+
throw new IllegalArgumentException("Only one of the two - key and metadatakey - should be specified");
261+
}
262+
if (key == null && metadataKey == null) {
263+
throw new IllegalArgumentException("At least one of the two - key and metadatakey - must be specified");
264+
}
265+
if (metadataKey != null && iterateOn != null) {
266+
throw new IllegalArgumentException("iterate_on cannot be applied to metadata");
267+
}
268+
if (iterateOn == null && addToElementWhen != null) {
269+
throw new InvalidPluginConfigurationException("add_to_element_when only applies when iterate_on is configured.");
270+
}
271+
272+
this.key = key;
273+
this.metadataKey = metadataKey;
274+
this.value = value;
275+
this.format = format;
276+
this.valueExpression = valueExpression;
277+
this.overwriteIfKeyExists = overwriteIfKeyExists;
278+
this.appendIfKeyExists = appendIfKeyExists;
279+
this.addWhen = addWhen;
280+
this.iterateOn = iterateOn;
281+
this.flattenKey = flattenKey;
282+
this.addToElementWhen = addToElementWhen;
283+
this.generateUuid = generateUuid;
284+
}
285+
232286
public Entry(final String key,
233287
final String metadataKey,
234288
final Object value,

data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/AddEntryProcessorTests.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,6 +1088,81 @@ public void testAddFlattenedNestedEntryIterateOn() {
10881088
equalTo(List.of(Map.of("key", 5, "nested/newMessage", 3)))); // [{"key": 5, "nested/newMessage": 3}}]
10891089
}
10901090

1091+
@Test
1092+
void test_generate_uuid_adds_uuid_string_to_event() {
1093+
when(mockConfig.getEntries()).thenReturn(createListOfEntries(
1094+
createEntryWithGenerateUuid("recordId", false, null)));
1095+
1096+
final AddEntryProcessor processor = createObjectUnderTest();
1097+
final Record<Event> record = getEvent("test-message");
1098+
final List<Record<Event>> result = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
1099+
1100+
final Event event = result.get(0).getData();
1101+
assertThat(event.containsKey("recordId"), is(true));
1102+
final String uuid = event.get("recordId", String.class);
1103+
assertThat(uuid, equalTo(UUID.fromString(uuid).toString()));
1104+
}
1105+
1106+
@Test
1107+
void test_generate_uuid_produces_unique_values_per_event() {
1108+
when(mockConfig.getEntries()).thenReturn(createListOfEntries(
1109+
createEntryWithGenerateUuid("recordId", false, null)));
1110+
1111+
final AddEntryProcessor processor = createObjectUnderTest();
1112+
final Record<Event> record1 = getEvent("message-one");
1113+
final Record<Event> record2 = getEvent("message-two");
1114+
final List<Record<Event>> result = (List<Record<Event>>) processor.doExecute(Arrays.asList(record1, record2));
1115+
1116+
final String uuid1 = result.get(0).getData().get("recordId", String.class);
1117+
final String uuid2 = result.get(1).getData().get("recordId", String.class);
1118+
assertThat(uuid1.equals(uuid2), is(false));
1119+
}
1120+
1121+
@Test
1122+
void test_generate_uuid_does_not_overwrite_existing_key_by_default() {
1123+
when(mockConfig.getEntries()).thenReturn(createListOfEntries(
1124+
createEntryWithGenerateUuid("existingId", false, null)));
1125+
1126+
final AddEntryProcessor processor = createObjectUnderTest();
1127+
final Map<String, Object> data = new HashMap<>();
1128+
data.put("existingId", "original-value");
1129+
final Record<Event> record = buildRecordWithEvent(data);
1130+
final List<Record<Event>> result = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
1131+
1132+
assertThat(result.get(0).getData().get("existingId", String.class), equalTo("original-value"));
1133+
}
1134+
1135+
@Test
1136+
void test_generate_uuid_overwrites_existing_key_when_overwrite_is_true() {
1137+
when(mockConfig.getEntries()).thenReturn(createListOfEntries(
1138+
createEntryWithGenerateUuid("existingId", true, null)));
1139+
1140+
final AddEntryProcessor processor = createObjectUnderTest();
1141+
final Map<String, Object> data = new HashMap<>();
1142+
data.put("existingId", "original-value");
1143+
final Record<Event> record = buildRecordWithEvent(data);
1144+
final List<Record<Event>> result = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
1145+
1146+
final String newValue = result.get(0).getData().get("existingId", String.class);
1147+
assertThat(newValue.equals("original-value"), is(false));
1148+
assertThat(newValue, equalTo(UUID.fromString(newValue).toString()));
1149+
}
1150+
1151+
@Test
1152+
void test_generate_uuid_respects_add_when_condition() {
1153+
final String addWhen = "/skip == true";
1154+
when(mockConfig.getEntries()).thenReturn(createListOfEntries(
1155+
createEntryWithGenerateUuid("recordId", false, addWhen)));
1156+
when(expressionEvaluator.isValidExpressionStatement(addWhen)).thenReturn(true);
1157+
when(expressionEvaluator.evaluateConditional(eq(addWhen), any())).thenReturn(false);
1158+
1159+
final AddEntryProcessor processor = createObjectUnderTest();
1160+
final Record<Event> record = getEvent("message");
1161+
final List<Record<Event>> result = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));
1162+
1163+
assertThat(result.get(0).getData().containsKey("recordId"), is(false));
1164+
}
1165+
10911166
private AddEntryProcessor createObjectUnderTest() {
10921167
return new AddEntryProcessor(pluginMetrics, mockConfig, expressionEvaluator, eventKeyFactory);
10931168
}
@@ -1124,6 +1199,14 @@ private AddEntryProcessorConfig.Entry createEntry(
11241199
iterateOn, addToElementWhen);
11251200
}
11261201

1202+
private AddEntryProcessorConfig.Entry createEntryWithGenerateUuid(
1203+
final String key,
1204+
final boolean overwriteIfKeyExists,
1205+
final String addWhen) {
1206+
return new AddEntryProcessorConfig.Entry(
1207+
key, null, null, null, null, overwriteIfKeyExists, false, addWhen, null, true, null, true);
1208+
}
1209+
11271210
private List<AddEntryProcessorConfig.Entry> createListOfEntries(final AddEntryProcessorConfig.Entry... entries) {
11281211
return new LinkedList<>(Arrays.asList(entries));
11291212
}

0 commit comments

Comments
 (0)