Skip to content

Commit b08a3a1

Browse files
alamzeeshanZeeshan Alam
andauthored
added delete_source functionality to the dissect processor (#5662)
* completed feature request Signed-off-by: Zeeshan Alam <Zeeshan.Alam+fidelity@fmr.com> * added missing import statement Signed-off-by: Zeeshan Alam <Zeeshan.Alam+fidelity@fmr.com> --------- Signed-off-by: Zeeshan Alam <Zeeshan.Alam+fidelity@fmr.com> Co-authored-by: Zeeshan Alam <Zeeshan.Alam+fidelity@fmr.com>
1 parent f290cfc commit b08a3a1

3 files changed

Lines changed: 67 additions & 0 deletions

File tree

data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
9090

9191
private void dissectField(Event event, String field){
9292
Dissector dissector = dissectorMap.get(field);
93+
boolean isDeleteSourceOnSuccessfulDissect = dissectConfig.isDeleteSourceRequested();
9394
String text = event.get(field, String.class);
9495
if (dissector.dissectText(text)) {
9596
List<Field> dissectedFields = dissector.getDissectedFields();
@@ -98,6 +99,9 @@ private void dissectField(Event event, String field){
9899
Object dissectFieldValue = convertTargetType(dissectFieldName,disectedField.getValue());
99100
event.put(disectedField.getKey(), dissectFieldValue);
100101
}
102+
if (isDeleteSourceOnSuccessfulDissect) {
103+
event.delete(field);
104+
}
101105
}
102106
}
103107

data-prepper-plugins/dissect-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ public class DissectProcessorConfig {
5454
"For example, <code>/some_value == \"log\"</code>.")
5555
private String dissectWhen;
5656

57+
@JsonProperty
58+
@JsonPropertyDescription("If true, the configured fields in the <code>map</code> will be deleted if the processor was successful.")
59+
private boolean deleteSource;
60+
5761
public String getDissectWhen(){
5862
return dissectWhen;
5963
}
@@ -64,6 +68,10 @@ public Map<String, String> getMap() {
6468

6569
public Map<String, TargetType> getTargetTypes() { return targetTypeMap; }
6670

71+
public boolean isDeleteSourceRequested() {
72+
return deleteSource;
73+
}
74+
6775
@AssertTrue(message = "target_type must be a Map<String, TargetType>. Valid options include [ 'integer', 'string', 'double', 'boolean', 'long', and 'big_decimal' ]")
6876
boolean isTargetTypeValid() {
6977
try {

data-prepper-plugins/dissect-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import static org.hamcrest.MatcherAssert.assertThat;
3333
import static org.junit.jupiter.api.Assertions.assertThrows;
3434
import static org.junit.jupiter.api.Assertions.assertTrue;
35+
import static org.junit.jupiter.api.Assertions.assertFalse;
3536
import static org.mockito.ArgumentMatchers.any;
3637
import static org.mockito.Mockito.when;
3738

@@ -267,4 +268,58 @@ private void reflectivelySetDissectorMap(DissectProcessor processor) throws NoSu
267268
reflectField.setAccessible(false);
268269
}
269270
}
271+
272+
@Test
273+
void test_delete_source_requested() throws NoSuchFieldException, IllegalAccessException {
274+
275+
Field dissectedField = new NormalField("level");
276+
dissectedField.setValue("WARN");
277+
278+
when(dissector.dissectText(any(String.class))).thenReturn(true);
279+
when(dissector.getDissectedFields()).thenReturn(List.of(dissectedField));
280+
when(dissectConfig.isDeleteSourceRequested()).thenReturn(true);
281+
282+
final DissectProcessor processor = createObjectUnderTest();
283+
reflectivelySetDissectorMap(processor);
284+
final Record<Event> dataPrepperRecord = getEvent("2025-01-28T00:00:00.000Z WARN This is a test log");
285+
final List<Record<Event>> dissectedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(dataPrepperRecord));
286+
287+
assertTrue(dissectedRecords.get(0).getData().containsKey("level"));
288+
assertThat(dissectedRecords.get(0).getData().get("level", String.class), is("WARN"));
289+
assertFalse(dissectedRecords.get(0).getData().containsKey("test"));
290+
}
291+
292+
@Test
293+
void test_delete_source_not_requested() throws NoSuchFieldException, IllegalAccessException {
294+
Field dissectedField = new NormalField("level");
295+
dissectedField.setValue("WARN");
296+
297+
when(dissector.dissectText(any(String.class))).thenReturn(true);
298+
when(dissector.getDissectedFields()).thenReturn(List.of(dissectedField));
299+
when(dissectConfig.isDeleteSourceRequested()).thenReturn(false);
300+
301+
final DissectProcessor processor = createObjectUnderTest();
302+
reflectivelySetDissectorMap(processor);
303+
final Record<Event> dataPrepperRecord = getEvent("2025-01-28T00:00:00.000Z WARN This is a test log");
304+
final List<Record<Event>> dissectedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(dataPrepperRecord));
305+
306+
assertTrue(dissectedRecords.get(0).getData().containsKey("level"));
307+
assertThat(dissectedRecords.get(0).getData().get("level", String.class), is("WARN"));
308+
assertTrue(dissectedRecords.get(0).getData().containsKey("test"));
309+
}
310+
311+
@Test
312+
void test_delete_source_requested_dissect_fail() throws NoSuchFieldException, IllegalAccessException {
313+
314+
when(dissector.dissectText(any(String.class))).thenReturn(false);
315+
when(dissectConfig.isDeleteSourceRequested()).thenReturn(true);
316+
317+
final DissectProcessor processor = createObjectUnderTest();
318+
reflectivelySetDissectorMap(processor);
319+
final Record<Event> dataPrepperRecord = getEvent("2025-01-28T00:00:00.000Z WARN This is a test log");
320+
final List<Record<Event>> dissectedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(dataPrepperRecord));
321+
322+
assertTrue(dissectedRecords.get(0).getData().containsKey("test"));
323+
}
324+
270325
}

0 commit comments

Comments
 (0)