Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ public class DeleteEntryProcessor extends AbstractProcessor<Record<Event>, Recor
private final List<EventKey> withKeys;
private final List<String> withKeysRegex;
private final List<Pattern> withKeysRegexPattern;
private final List<EventKey> deleteAllExcept;
private final Set<String> deleteAllExceptKeys;
private final Set<EventKey> excludeFromDelete;
private final String deleteWhen;
private final String iterateOn;
private final String deleteFromElementWhen;
private final List<DeleteEntryProcessorConfig.Entry> entries;

private final ExpressionEvaluator expressionEvaluator;
Expand All @@ -50,9 +54,15 @@ public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntry
this.withKeys = config.getWithKeys();
this.withKeysRegex = config.getWithKeysRegex();
this.withKeysRegexPattern = config.getWithKeysRegexPattern();
this.deleteAllExcept = config.getDeleteAllExcept();
Comment thread
fanxiangrui-123 marked this conversation as resolved.
this.deleteAllExceptKeys = this.deleteAllExcept.stream()
.map(EventKey::getKey)
.collect(Collectors.toSet());
this.excludeFromDelete = config.getExcludeFromDelete();
this.deleteEntryProcessorConfig = config;
this.deleteWhen = config.getDeleteWhen();
this.iterateOn = config.getIterateOn();
this.deleteFromElementWhen = config.getDeleteFromElementWhen();
this.expressionEvaluator = expressionEvaluator;

if (deleteWhen != null
Expand All @@ -62,19 +72,33 @@ public DeleteEntryProcessor(final PluginMetrics pluginMetrics, final DeleteEntry
".org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", deleteWhen));
}

if (!this.withKeys.isEmpty() || !this.withKeysRegex.isEmpty()) {
if (!this.deleteAllExcept.isEmpty()) {
this.entries = Collections.emptyList();
} else if (!this.withKeys.isEmpty() || !this.withKeysRegex.isEmpty()) {
DeleteEntryProcessorConfig.Entry entry = new DeleteEntryProcessorConfig.Entry(
this.withKeys,
this.withKeysRegex,
this.excludeFromDelete,
this.deleteWhen,
config.getIterateOn(),
config.getDeleteFromElementWhen());
this.iterateOn,
this.deleteFromElementWhen);
this.entries = List.of(entry);
} else {
this.entries = config.getEntries();
}

if (!this.deleteAllExcept.isEmpty() && this.iterateOn == null && this.deleteFromElementWhen != null) {
throw new InvalidPluginConfigurationException("delete_from_element_when only applies when iterate_on is configured.");
}

if (!this.deleteAllExcept.isEmpty() && this.deleteFromElementWhen != null
&& !expressionEvaluator.isValidExpressionStatement(this.deleteFromElementWhen)) {
throw new InvalidPluginConfigurationException(
String.format("delete_from_element_when %s is not a valid expression statement. See https://opensearch" +
".org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
this.deleteFromElementWhen));
}

this.entries.forEach(entry -> {
if (entry.getDeleteWhen() != null
&& !expressionEvaluator.isValidExpressionStatement(entry.getDeleteWhen())) {
Expand Down Expand Up @@ -103,6 +127,11 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
for (final Record<Event> record : records) {
final Event recordEvent = record.getData();
try {
if (!deleteAllExcept.isEmpty()) {
processDeleteAllExcept(recordEvent);
continue;
}

for (final DeleteEntryProcessorConfig.Entry entry : entries) {
if (Objects.nonNull(entry.getDeleteWhen()) && !expressionEvaluator.evaluateConditional(entry.getDeleteWhen(), recordEvent)) {
continue;
Expand Down Expand Up @@ -168,6 +197,73 @@ private void deleteKeysFromEvent(final Event event, final DeleteEntryProcessorCo
}
}

private void processDeleteAllExcept(final Event recordEvent) {
if (Objects.nonNull(deleteWhen) && !expressionEvaluator.evaluateConditional(deleteWhen, recordEvent)) {
return;
}

if (Objects.isNull(iterateOn)) {
deleteAllExceptFromEvent(recordEvent);
} else {
handleDeleteAllExceptForIterateOn(recordEvent);
}
}

private void deleteAllExceptFromEvent(final Event event) {
deleteAllExceptFromMap(event, "", event.toMap());
}

private void deleteAllExceptFromMap(final Event event, final String parentPath, final Map<String, Object> values) {
for (final Map.Entry<String, Object> entry : values.entrySet()) {
final String path = parentPath.isEmpty() ? entry.getKey() : parentPath + "/" + entry.getKey();

if (deleteAllExceptKeys.contains(path)) {
continue;
}

if (entry.getValue() instanceof Map && hasAllowlistedDescendant(path)) {
deleteAllExceptFromMap(event, path, (Map<String, Object>) entry.getValue());
if (!hasExistingAllowlistedDescendant(event, path)) {
event.delete(path);
}
} else {
event.delete(path);
}
}
}

private boolean hasAllowlistedDescendant(final String path) {
return deleteAllExceptKeys.stream()
.anyMatch(keyToKeep -> keyToKeep.startsWith(path + "/"));
}

private boolean hasExistingAllowlistedDescendant(final Event event, final String path) {
return deleteAllExcept.stream()
.filter(keyToKeep -> keyToKeep.getKey().startsWith(path + "/"))
.anyMatch(event::containsKey);
}

private void handleDeleteAllExceptForIterateOn(final Event recordEvent) {
final List<Map<String, Object>> iterateOnList = recordEvent.get(iterateOn, List.class);
if (iterateOnList != null) {
for (int i = 0; i < iterateOnList.size(); i++) {
final Map<String, Object> item = iterateOnList.get(i);
final Event context = JacksonEvent.builder()
.withEventMetadata(recordEvent.getMetadata())
.withData(item)
.build();
if (deleteFromElementWhen != null &&
!expressionEvaluator.evaluateConditional(deleteFromElementWhen, context)) {
continue;
}

deleteAllExceptFromEvent(context);
iterateOnList.set(i, context.toMap());
}
recordEvent.put(iterateOn, iterateOnList);
}
}

private void handleForIterateOn(final Event recordEvent,
final DeleteEntryProcessorConfig.Entry entry,
final String iterateOn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,24 @@

@ConditionalRequired(value = {
@IfThenElse(
ifFulfilled = {@SchemaProperty(field = "entries", value = "null"), @SchemaProperty(field = "with_keys_regex", value = "null")},
ifFulfilled = {@SchemaProperty(field = "entries", value = "null"), @SchemaProperty(field = "with_keys_regex", value = "null"),
@SchemaProperty(field = "delete_all_except", value = "null")},
thenExpect = {@SchemaProperty(field = "with_keys")}
),
@IfThenElse(
ifFulfilled = {@SchemaProperty(field = "entries", value = "null"), @SchemaProperty(field = "with_keys", value = "null")},
ifFulfilled = {@SchemaProperty(field = "entries", value = "null"), @SchemaProperty(field = "with_keys", value = "null"),
@SchemaProperty(field = "delete_all_except", value = "null")},
thenExpect = {@SchemaProperty(field = "with_keys_regex")}
),
@IfThenElse(
ifFulfilled = {@SchemaProperty(field = "with_keys", value = "null"), @SchemaProperty(field = "with_keys_regex", value = "null")},
ifFulfilled = {@SchemaProperty(field = "with_keys", value = "null"), @SchemaProperty(field = "with_keys_regex", value = "null"),
@SchemaProperty(field = "delete_all_except", value = "null")},
thenExpect = {@SchemaProperty(field = "entries")}
),
@IfThenElse(
ifFulfilled = {@SchemaProperty(field = "entries", value = "null"), @SchemaProperty(field = "with_keys", value = "null"),
@SchemaProperty(field = "with_keys_regex", value = "null")},
thenExpect = {@SchemaProperty(field = "delete_all_except")}
)
})
@JsonPropertyOrder
Expand Down Expand Up @@ -165,6 +173,11 @@ public Entry() {
@JsonPropertyDescription("A list of regex patterns that match keys to be deleted from an event. May not be used with entries.")
private List<String> withKeysRegex;

@JsonProperty("delete_all_except")
@EventKeyConfiguration(EventKeyFactory.EventAction.GET)
@JsonPropertyDescription("A list of keys to keep in an event. All other keys are deleted. May not be used with entries, with_keys, or with_keys_regex.")
private List<@NotNull @NotEmpty EventKey> deleteAllExcept;

@JsonProperty("exclude_from_delete")
@JsonPropertyDescription("A list of keys to exclude from deletion when using with_keys_regex.")
private Set<EventKey> excludeFromDelete;
Expand All @@ -178,17 +191,18 @@ public Entry() {
@JsonPropertyDescription("A list of entries to delete from the event.")
private List<Entry> entries;

@AssertTrue(message = "One of the following must be provided: 'entries', 'with_keys', or 'with_keys_regex'. None of these are configured.")
@AssertTrue(message = "One of the following must be provided: 'entries', 'with_keys', 'with_keys_regex', or 'delete_all_except'. None of these are configured.")
boolean isConfigurationPresent() {
return entries != null || withKeys != null || withKeysRegex != null;
return entries != null || withKeys != null || withKeysRegex != null || deleteAllExcept != null;
}

@AssertTrue(message = "You can only use one of the following at a time: 'entries', 'with_keys', or 'with_keys_regex'")
@AssertTrue(message = "You can only use one of the following at a time: 'entries', 'with_keys', 'with_keys_regex', or 'delete_all_except'")
boolean hasOnlyOneConfiguration() {
int count = 0;
if (entries != null) count++;
if (withKeys != null) count++;
if (withKeysRegex != null) count++;
if (deleteAllExcept != null) count++;
return count == 1;
}

Expand Down Expand Up @@ -235,6 +249,10 @@ public List<String> getWithKeysRegex() {
return withKeysRegex != null ? withKeysRegex : Collections.emptyList();
}

public List<EventKey> getDeleteAllExcept() {
Comment thread
fanxiangrui-123 marked this conversation as resolved.
return deleteAllExcept != null ? deleteAllExcept : Collections.emptyList();
}

public List<Pattern> getWithKeysRegexPattern() {
if (withKeysRegexPatterns == null && withKeysRegex != null && !withKeysRegex.isEmpty()) {
setWithKeysRegexPatterns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,38 @@ void testisExcludeFromDeleteValid_with_nonEmptyWithKeysRegex_and_nullExcludeKeys
assertThat(objectUnderTest.isExcludeFromDeleteValid(), equalTo(true));
}

@Test
void testConfigurationValidation_with_onlyDeleteAllExcept() throws NoSuchFieldException, IllegalAccessException {
final DeleteEntryProcessorConfig objectUnderTest = new DeleteEntryProcessorConfig();

ReflectivelySetField.setField(DeleteEntryProcessorConfig.class, objectUnderTest, "deleteAllExcept", List.of(mock(EventKey.class)));

assertThat(objectUnderTest.isConfigurationPresent(), equalTo(true));
assertThat(objectUnderTest.hasOnlyOneConfiguration(), equalTo(true));
}

@ParameterizedTest
@MethodSource("provideConflictingDeleteAllExceptConfigurations")
void testConfigurationValidation_with_deleteAllExcept_conflicts(
final String conflictingFieldName,
final Object conflictingFieldValue) throws NoSuchFieldException, IllegalAccessException {
final DeleteEntryProcessorConfig objectUnderTest = new DeleteEntryProcessorConfig();

ReflectivelySetField.setField(DeleteEntryProcessorConfig.class, objectUnderTest, "deleteAllExcept", List.of(mock(EventKey.class)));
ReflectivelySetField.setField(DeleteEntryProcessorConfig.class, objectUnderTest, conflictingFieldName, conflictingFieldValue);

assertThat(objectUnderTest.hasOnlyOneConfiguration(), equalTo(false));
}

private static Stream<Arguments> provideConflictingDeleteAllExceptConfigurations() {
return Stream.of(
Arguments.of("withKeys", List.of(mock(EventKey.class))),
Arguments.of("withKeysRegex", List.of("^test.*")),
Arguments.of("entries", List.of(new DeleteEntryProcessorConfig.Entry(
List.of(mock(EventKey.class)), null, null, null, null, null)))
);
}

@ParameterizedTest
@MethodSource("provideEntriesForExcludeFromDeleteValidation")
void testIsExcludeFromDeleteValid_with_entries(DeleteEntryProcessorConfig.Entry entry, boolean expectedResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,83 @@ public void testMultiDeleteProcessorTest() {
assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true));
}

@Test
public void test_delete_all_except_keeps_only_configured_keys() {
when(mockConfig.getDeleteAllExcept()).thenReturn(List.of(
eventKeyFactory.createEventKey("message", EventKeyFactory.EventAction.GET),
eventKeyFactory.createEventKey("keep_me", EventKeyFactory.EventAction.GET)));
when(mockConfig.getDeleteWhen()).thenReturn(null);

final DeleteEntryProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
record.getData().put("keep_me", "keep");
record.getData().put("delete_me", "delete");

final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("message"), is(true));
assertThat(editedRecords.get(0).getData().containsKey("keep_me"), is(true));
assertThat(editedRecords.get(0).getData().containsKey("delete_me"), is(false));
}

@Test
public void test_delete_all_except_ignores_missing_keys() {
when(mockConfig.getDeleteAllExcept()).thenReturn(List.of(
eventKeyFactory.createEventKey("message", EventKeyFactory.EventAction.GET),
eventKeyFactory.createEventKey("missing_key", EventKeyFactory.EventAction.GET)));
when(mockConfig.getDeleteWhen()).thenReturn(null);

final DeleteEntryProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
record.getData().put("delete_me", "delete");

final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("message"), is(true));
assertThat(editedRecords.get(0).getData().containsKey("missing_key"), is(false));
assertThat(editedRecords.get(0).getData().containsKey("delete_me"), is(false));
}

@Test
public void test_delete_all_except_keeps_nested_key() {
when(mockConfig.getDeleteAllExcept()).thenReturn(List.of(
eventKeyFactory.createEventKey("user/name", EventKeyFactory.EventAction.GET)));
when(mockConfig.getDeleteWhen()).thenReturn(null);

final DeleteEntryProcessor processor = createObjectUnderTest();
final Map<String, Object> data = new HashMap<>();
data.put("message", "thisisamessage");
data.put("user", Map.of("name", "Jane", "id", "123"));
final Record<Event> record = buildRecordWithEvent(data);

final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("user/name"), is(true));
assertThat(editedRecords.get(0).getData().get("user/name", String.class), equalTo("Jane"));
assertThat(editedRecords.get(0).getData().containsKey("user/id"), is(false));
assertThat(editedRecords.get(0).getData().containsKey("message"), is(false));
}

@Test
public void test_delete_all_except_does_not_delete_when_deleteWhen_returns_false() {
final String deleteWhen = UUID.randomUUID().toString();
when(mockConfig.getDeleteAllExcept()).thenReturn(List.of(
eventKeyFactory.createEventKey("message", EventKeyFactory.EventAction.GET)));
when(mockConfig.getDeleteWhen()).thenReturn(deleteWhen);
when(expressionEvaluator.isValidExpressionStatement(deleteWhen)).thenReturn(true);

final DeleteEntryProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("thisisamessage");
record.getData().put("newMessage", "test");

when(expressionEvaluator.evaluateConditional(deleteWhen, record.getData())).thenReturn(false);

final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("message"), is(true));
assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true));
}

@Test
public void testKeyIsNotDeleted_when_deleteWhen_returns_false() {
when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey("message", EventKeyFactory.EventAction.DELETE)));
Expand Down Expand Up @@ -383,6 +460,7 @@ public void test_no_configuration_used() {

ReflectionTestUtils.setField(configObjectUnderTest, "withKeys", null);
ReflectionTestUtils.setField(configObjectUnderTest, "withKeysRegex", null);
ReflectionTestUtils.setField(configObjectUnderTest, "deleteAllExcept", null);
ReflectionTestUtils.setField(configObjectUnderTest, "entries", null);

assertThat(configObjectUnderTest.isConfigurationPresent(), is(false));
Expand Down