Skip to content

Commit 19fd9c8

Browse files
committed
Addressed review comments
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent 24b2304 commit 19fd9c8

6 files changed

Lines changed: 83 additions & 36 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,16 @@ public interface Event extends Serializable {
3535
*/
3636
void put(EventKey key, Object value);
3737

38+
/**
39+
* Adds or updates the key with a given value in the Event
40+
*
41+
* @param key where the value will be set
42+
* @param value value to set the key to
43+
* @param replaceInvalidCharacters flag indicating if invalid characters should be replaced or not
44+
* @since 2.13
45+
*/
46+
void put(EventKey key, Object value, boolean replaceInvalidCharacters);
47+
3848
/**
3949
* Adds or updates the key with a given value in the Event
4050
*

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,33 @@ public JsonNode getJsonNode() {
142142
return jsonNode;
143143
}
144144

145+
void normalizeKeys(Map<String, Object> map) {
146+
Map<String, Object> toAdd = new HashMap<>();
147+
Iterator<Map.Entry<String, Object>> iterator = map.entrySet().iterator();
148+
while (iterator.hasNext()) {
149+
Map.Entry<String, Object> entry = iterator.next();
150+
String key = entry.getKey();
151+
Object value = entry.getValue();
152+
final String newKey = JacksonEventKey.replaceInvalidCharacters(key);
153+
if (value instanceof Map) {
154+
normalizeKeys((Map<String, Object>)value);
155+
}
156+
if (!newKey.equals(key)) {
157+
toAdd.put(newKey, value);
158+
iterator.remove();
159+
}
160+
}
161+
map.putAll(toAdd);
162+
}
163+
164+
@Override
165+
public void put(EventKey key, Object value, boolean replaceInvalidCharacters) {
166+
if (replaceInvalidCharacters && (value instanceof Map)) {
167+
normalizeKeys((Map<String, Object>)value);
168+
}
169+
put(key, value);
170+
}
171+
145172
@Override
146173
public void put(EventKey key, Object value) {
147174
final JacksonEventKey jacksonEventKey = asJacksonEventKey(key);
@@ -170,6 +197,9 @@ public void put(EventKey key, Object value) {
170197
public void put(String key, final Object value, final boolean replaceInvalidCharacters) {
171198
if (replaceInvalidCharacters) {
172199
key = JacksonEventKey.replaceInvalidCharacters(key);
200+
if (value instanceof Map) {
201+
normalizeKeys((Map<String, Object>)value);
202+
}
173203
}
174204
put(key, value);
175205
}

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,41 @@ public void testPutAndGet_withMultiLevelKey_eventKey() {
278278
assertThat(result, is(equalTo(value)));
279279
}
280280

281+
@Test
282+
public void testPutAndGet_withMultiLevelInvalidValues() {
283+
final Map<String, Object> data1 = new HashMap<>();
284+
final Map<String, Object> data2 = new HashMap<>();
285+
final Map<String, Object> data3 = new HashMap<>();
286+
data3.put("key$5", "value5");
287+
data2.put("key^3", 3);
288+
data2.put("key%4", data3);
289+
data1.put("key 1", "value1");
290+
data1.put("key&2", data2);
291+
292+
event.put("foo", data1, true);
293+
assertThat(event.get("foo/key_1", String.class), equalTo("value1"));
294+
assertThat(event.get("foo/key_2/key_3", Integer.class), equalTo(3));
295+
assertThat(event.get("foo/key_2/key_4/key_5", String.class), equalTo("value5"));
296+
}
297+
298+
@Test
299+
public void testPutAndGet_withMultiLevelInvalidValues_eventKey() {
300+
final EventKey key = new JacksonEventKey("foo");
301+
final Map<String, Object> data1 = new HashMap<>();
302+
final Map<String, Object> data2 = new HashMap<>();
303+
final Map<String, Object> data3 = new HashMap<>();
304+
data3.put("key$5", "value5");
305+
data2.put("key^3", 3);
306+
data2.put("key%4", data3);
307+
data1.put("key 1", "value1");
308+
data1.put("key&2", data2);
309+
310+
event.put(key, data1, true);
311+
assertThat(event.get("foo/key_1", String.class), equalTo("value1"));
312+
assertThat(event.get("foo/key_2/key_3", Integer.class), equalTo(3));
313+
assertThat(event.get("foo/key_2/key_4/key_5", String.class), equalTo("value5"));
314+
}
315+
281316
@Test
282317
public void testPutAndGet_withMultiLevelKeyTwice() {
283318
final String key = "foo/bar";

data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public class KeyValueProcessor extends AbstractProcessor<Record<Event>, Record<E
6565
private final List<String> tagsOnFailure;
6666
private final Character stringLiteralCharacter;
6767
private final String keyPrefix;
68-
private final Boolean normalizeKeys;
68+
private final boolean normalizeKeys;
6969

7070
@DataPrepperPluginConstructor
7171
public KeyValueProcessor(final PluginMetrics pluginMetrics,
@@ -393,7 +393,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
393393
} else {
394394
if (keyValueProcessorConfig.getOverwriteIfDestinationExists() ||
395395
!recordEvent.containsKey(keyValueProcessorConfig.getDestination())) {
396-
recordEvent.put(keyValueProcessorConfig.getDestination(), processedMap);
396+
recordEvent.put(keyValueProcessorConfig.getDestination(), processedMap, normalizeKeys);
397397
}
398398
}
399399
} catch (final Exception e) {
@@ -610,9 +610,6 @@ private Map<String, Object> executeConfigs(Map<String, Object> map) {
610610
}
611611
}
612612

613-
if (normalizeKeys) {
614-
key = JacksonEvent.replaceInvalidKeyChars(key);
615-
}
616613
addKeyValueToMap(processed, key, value);
617614
}
618615

@@ -623,9 +620,6 @@ private Map<String, Object> executeConfigs(Map<String, Object> map) {
623620
}
624621
if (validKeyAndValue(pair.getKey(), pair.getValue())) {
625622
String key = pair.getKey();
626-
if (normalizeKeys) {
627-
key = JacksonEvent.replaceInvalidKeyChars(key);
628-
}
629623
processed.put(key, pair.getValue());
630624
}
631625
}
@@ -699,7 +693,7 @@ private void writeToRoot(final Event event, final Map<String, Object> parsedJson
699693
for (Map.Entry<String, Object> entry : parsedJson.entrySet()) {
700694
try {
701695
if (keyValueProcessorConfig.getOverwriteIfDestinationExists() || !event.containsKey(entry.getKey())) {
702-
event.put(entry.getKey(), entry.getValue());
696+
event.put(entry.getKey(), entry.getValue(), normalizeKeys);
703697
}
704698
} catch (IllegalArgumentException e) {
705699
LOG.warn("Failed to put key: "+entry.getKey()+" value : "+entry.getValue()+" into event. ", e);

data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/AbstractParseProcessor.java

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public abstract class AbstractParseProcessor extends AbstractProcessor<Record<Ev
5757

5858
private final ExpressionEvaluator expressionEvaluator;
5959
private final EventKeyFactory eventKeyFactory;
60+
private final boolean normalizeKeys;
6061

6162
protected AbstractParseProcessor(final PluginMetrics pluginMetrics,
6263
final CommonParseConfig commonParseConfig,
@@ -73,6 +74,7 @@ protected AbstractParseProcessor(final PluginMetrics pluginMetrics,
7374
deleteSourceRequested = commonParseConfig.isDeleteSourceRequested();
7475
handleFailedEventsOption = commonParseConfig.getHandleFailedEventsOption();
7576
processingFailuresCounter = pluginMetrics.counter(PROCESSING_FAILURES);
77+
normalizeKeys = commonParseConfig.getNormalizeKeys();
7678
this.expressionEvaluator = expressionEvaluator;
7779
this.eventKeyFactory = eventKeyFactory;
7880

@@ -116,25 +118,6 @@ protected Map<String, Object> convertNestedObjectToString(Map<String, Object> ma
116118
return resultMap;
117119
}
118120

119-
protected void replaceInvalidChars(Map<String, Object> map) {
120-
Map<String, Object> toAdd = new HashMap<>();
121-
Iterator<Map.Entry<String, Object>> iterator = map.entrySet().iterator();
122-
while (iterator.hasNext()) {
123-
Map.Entry<String, Object> entry = iterator.next();
124-
String key = entry.getKey();
125-
Object value = entry.getValue();
126-
final String newKey = JacksonEvent.replaceInvalidKeyChars(key);
127-
if (value instanceof Map) {
128-
replaceInvalidChars((Map<String, Object>)value);
129-
}
130-
if (!newKey.equals(key)) {
131-
toAdd.put(newKey, value);
132-
iterator.remove();
133-
}
134-
}
135-
map.putAll(toAdd);
136-
}
137-
138121
@Override
139122
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
140123
final boolean doWriteToRoot = Objects.isNull(destination);
@@ -168,7 +151,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
168151
if (doWriteToRoot) {
169152
writeToRoot(event, parsedValue);
170153
} else if (overwriteIfDestinationExists || !event.containsKey(destination)) {
171-
event.put(destination, parsedValue);
154+
event.put(destination, parsedValue, normalizeKeys);
172155
}
173156

174157
if(deleteSourceRequested) {
@@ -207,7 +190,7 @@ private Map<String, Object> parseUsingPointer(final Event event, final Map<Strin
207190
final boolean doWriteToRoot) {
208191
final Event temporaryEvent = JacksonEvent.builder().withEventType("event").build();
209192
final EventKey temporaryPutKey = eventKeyFactory.createEventKey(source.getKey(), EventKeyFactory.EventAction.PUT);
210-
temporaryEvent.put(temporaryPutKey, parsedJson);
193+
temporaryEvent.put(temporaryPutKey, parsedJson, normalizeKeys);
211194

212195
final String trimmedPointer = trimPointer(pointer);
213196
final String actualPointer = source + "/" + trimmedPointer;
@@ -259,7 +242,7 @@ private String trimPointer(final String pointer) {
259242
private void writeToRoot(final Event event, final Map<String, Object> parsedJson) {
260243
for (final Map.Entry<String, Object> entry : parsedJson.entrySet()) {
261244
if (overwriteIfDestinationExists || !event.containsKey(entry.getKey())) {
262-
event.put(eventKeyFactory.createEventKey(entry.getKey(), EventKeyFactory.EventAction.PUT), entry.getValue());
245+
event.put(eventKeyFactory.createEventKey(entry.getKey(), EventKeyFactory.EventAction.PUT), entry.getValue(), normalizeKeys);
263246
}
264247
}
265248
}

data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parse/json/ParseJsonProcessor.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public class ParseJsonProcessor extends AbstractParseProcessor {
3838
private final Counter parseErrorsCounter;
3939

4040
private final int depth;
41-
private final Boolean normalizeKeys;
4241

4342
@DataPrepperPluginConstructor
4443
public ParseJsonProcessor(final PluginMetrics pluginMetrics,
@@ -48,17 +47,13 @@ public ParseJsonProcessor(final PluginMetrics pluginMetrics,
4847
super(pluginMetrics, parseJsonProcessorConfig, expressionEvaluator, eventKeyFactory);
4948
this.handleFailedEventsOption = parseJsonProcessorConfig.getHandleFailedEventsOption();
5049
this.depth = parseJsonProcessorConfig.getDepth();
51-
this.normalizeKeys = parseJsonProcessorConfig.getNormalizeKeys();
5250
parseErrorsCounter = pluginMetrics.counter(PARSE_ERRORS);
5351
}
5452

5553
@Override
5654
protected Optional<Map<String, Object>> readValue(String message, Event context) {
5755
try {
5856
final HashMap<String, Object> map = objectMapper.readValue(message, new TypeReference<>() {});
59-
if (normalizeKeys) {
60-
replaceInvalidChars(map);
61-
}
6257
if (depth == 0) {
6358
return Optional.of(map);
6459
}

0 commit comments

Comments
 (0)