Skip to content

Commit 48d5135

Browse files
authored
Add support for replacing invalid characters in parsing processors (#5823)
* Add support for replacing invalid characters in parsing processors Signed-off-by: Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Kondaka <krishkdk@amazon.com> * Fixed javadoc error Signed-off-by: Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Kondaka <krishkdk@amazon.com> * Fixed checkstyle errors Signed-off-by: Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent f27b493 commit 48d5135

21 files changed

Lines changed: 295 additions & 12 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,26 @@ public interface Event extends Serializable {
3535
*/
3636
void put(EventKey key, Object value);
3737

38+
/**
39+
* Adds or updates the key with a given value in the Event
40+
*
41+
* @param key where the value will be set
42+
* @param value value to set the key to
43+
* @param replaceInvalidCharacters flag indicating if invalid characters should be replaced or not
44+
* @since 2.13
45+
*/
46+
void put(EventKey key, Object value, boolean replaceInvalidCharacters);
47+
48+
/**
49+
* Adds or updates the key with a given value in the Event
50+
*
51+
* @param key where the value will be set
52+
* @param value value to set the key to
53+
* @param replaceInvalidCharacters flag indicating if invalid characters should be replaced or not
54+
* @since 2.13
55+
*/
56+
void put(String key, Object value, boolean replaceInvalidCharacters);
57+
3858
/**
3959
* Adds or updates the key with a given value in the Event
4060
*

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,33 @@ public JsonNode getJsonNode() {
142142
return jsonNode;
143143
}
144144

145+
void normalizeKeys(Map<String, Object> map) {
146+
Map<String, Object> toAdd = new HashMap<>();
147+
Iterator<Map.Entry<String, Object>> iterator = map.entrySet().iterator();
148+
while (iterator.hasNext()) {
149+
Map.Entry<String, Object> entry = iterator.next();
150+
String key = entry.getKey();
151+
Object value = entry.getValue();
152+
final String newKey = JacksonEventKey.replaceInvalidCharacters(key);
153+
if (value instanceof Map) {
154+
normalizeKeys((Map<String, Object>)value);
155+
}
156+
if (!newKey.equals(key)) {
157+
toAdd.put(newKey, value);
158+
iterator.remove();
159+
}
160+
}
161+
map.putAll(toAdd);
162+
}
163+
164+
@Override
165+
public void put(EventKey key, Object value, boolean replaceInvalidCharacters) {
166+
if (replaceInvalidCharacters && (value instanceof Map)) {
167+
normalizeKeys((Map<String, Object>)value);
168+
}
169+
put(key, value);
170+
}
171+
145172
@Override
146173
public void put(EventKey key, Object value) {
147174
final JacksonEventKey jacksonEventKey = asJacksonEventKey(key);
@@ -166,6 +193,17 @@ public void put(EventKey key, Object value) {
166193
}
167194
}
168195

196+
@Override
197+
public void put(String key, final Object value, final boolean replaceInvalidCharacters) {
198+
if (replaceInvalidCharacters) {
199+
key = JacksonEventKey.replaceInvalidCharacters(key);
200+
if (value instanceof Map) {
201+
normalizeKeys((Map<String, Object>)value);
202+
}
203+
}
204+
put(key, value);
205+
}
206+
169207
/**
170208
* Adds or updates the key with a given value in the Event.
171209
*
@@ -261,6 +299,10 @@ public <T> T get(final String key, final Class<T> clazz) {
261299
return get(jacksonEventKey, clazz);
262300
}
263301

302+
public static String replaceInvalidKeyChars(final String key) {
303+
return JacksonEventKey.replaceInvalidCharacters(key);
304+
}
305+
264306
private JsonNode getNode(final String key) {
265307
final JsonPointer jsonPointer = toJsonPointer(key);
266308
return jsonNode.at(jsonPointer);

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEventKey.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
import java.util.List;
1414
import java.util.Objects;
1515
import java.util.Set;
16+
import java.util.regex.Pattern;
1617

1718
import static com.google.common.base.Preconditions.checkNotNull;
1819

1920
class JacksonEventKey implements EventKey {
21+
private static final String INVALID_KEY_REPLACEMENT = "_";
2022
private static final String SEPARATOR = "/";
2123
private static final int MAX_KEY_LENGTH = 2048;
2224
private final String key;
@@ -25,6 +27,8 @@ class JacksonEventKey implements EventKey {
2527
private List<String> keyPathList;
2628
private JsonPointer jsonPointer;
2729
private final Set<EventKeyFactory.EventAction> supportedActions;
30+
private static final Pattern INVALID_KEY_CHARS_PATTERN =
31+
Pattern.compile("[^A-Za-z0-9._~@/\\[\\]-]");
2832

2933
/**
3034
* Constructor for the JacksonEventKey which should only be used by implementation
@@ -151,6 +155,16 @@ static String trimTrailingSlashInKey(final String key) {
151155
return key.length() > 1 && key.endsWith(SEPARATOR) ? key.substring(0, key.length() - 1) : key;
152156
}
153157

158+
static String replaceInvalidCharacters(final String key) {
159+
if (key == null) {
160+
return null;
161+
}
162+
if (isValidKey(key)) {
163+
return key;
164+
}
165+
return INVALID_KEY_CHARS_PATTERN.matcher(key).replaceAll(INVALID_KEY_REPLACEMENT);
166+
}
167+
154168
private static boolean isValidKey(final String key) {
155169
for (int i = 0; i < key.length(); i++) {
156170
char c = key.charAt(i);

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventKeyTest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,13 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext exten
196196
}
197197
}
198198

199+
@ParameterizedTest
200+
@ValueSource(strings = {"key 1", "key$1", "key&1", "key^1", "key%1", "key_1"})
201+
public void testReplaceInvalidKeyChars(final String key) {
202+
assertThat(JacksonEventKey.replaceInvalidCharacters(key), equalTo("key_1"));
203+
assertThat(JacksonEventKey.replaceInvalidCharacters(null), equalTo(null));
204+
}
205+
199206
@ParameterizedTest
200207
@EnumSource(EventKeyFactory.EventAction.class)
201208
void equals_returns_true_for_same_key_and_actions(final EventKeyFactory.EventAction eventAction) {
@@ -282,4 +289,4 @@ void toString_returns_the_key(final EventKeyFactory.EventAction eventAction) {
282289

283290
assertThat(objectUnderTest.toString(), equalTo(testKey));
284291
}
285-
}
292+
}

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,31 @@ public void testPutKeyCannotBeEmptyString() {
229229
assertThat(exception.getMessage(), containsStringIgnoringCase("key cannot be an empty string"));
230230
}
231231

232+
@ParameterizedTest
233+
@ValueSource(strings = {"key 1", "key$1", "key&1", "key^1", "key%1", "key_1"})
234+
public void testReplaceInvalidKeyChars(final String key) {
235+
assertThat(JacksonEvent.replaceInvalidKeyChars(key), equalTo("key_1"));
236+
assertThat(JacksonEvent.replaceInvalidKeyChars(key.substring(0,3)), equalTo("key"));
237+
assertThat(JacksonEvent.replaceInvalidKeyChars(null), equalTo(null));
238+
}
239+
240+
@ParameterizedTest
241+
@ValueSource(strings = {"key 1", "key$1", "key&1", "key^1", "key%1", "key_1"})
242+
public void testPutWithReplaceInvalidKeyChars(final String key) {
243+
final String value = UUID.randomUUID().toString();
244+
245+
event.put(key, value, true);
246+
assertThat(event.get("key_1", String.class), equalTo(value));
247+
}
248+
249+
@ParameterizedTest
250+
@ValueSource(strings = {"key 1", "key$1", "key&1", "key^1", "key%1"})
251+
public void testPutWithoutReplaceInvalidKeyChars(final String key) {
252+
final String value = UUID.randomUUID().toString();
253+
254+
assertThrows(IllegalArgumentException.class, () -> event.put(key, value, false));
255+
}
256+
232257
@Test
233258
public void testPutAndGet_withMultiLevelKey() {
234259
final String key = "foo/bar";
@@ -253,6 +278,41 @@ public void testPutAndGet_withMultiLevelKey_eventKey() {
253278
assertThat(result, is(equalTo(value)));
254279
}
255280

281+
@Test
282+
public void testPutAndGet_withMultiLevelInvalidValues() {
283+
final Map<String, Object> data1 = new HashMap<>();
284+
final Map<String, Object> data2 = new HashMap<>();
285+
final Map<String, Object> data3 = new HashMap<>();
286+
data3.put("key$5", "value5");
287+
data2.put("key^3", 3);
288+
data2.put("key%4", data3);
289+
data1.put("key 1", "value1");
290+
data1.put("key&2", data2);
291+
292+
event.put("foo", data1, true);
293+
assertThat(event.get("foo/key_1", String.class), equalTo("value1"));
294+
assertThat(event.get("foo/key_2/key_3", Integer.class), equalTo(3));
295+
assertThat(event.get("foo/key_2/key_4/key_5", String.class), equalTo("value5"));
296+
}
297+
298+
@Test
299+
public void testPutAndGet_withMultiLevelInvalidValues_eventKey() {
300+
final EventKey key = new JacksonEventKey("foo");
301+
final Map<String, Object> data1 = new HashMap<>();
302+
final Map<String, Object> data2 = new HashMap<>();
303+
final Map<String, Object> data3 = new HashMap<>();
304+
data3.put("key$5", "value5");
305+
data2.put("key^3", 3);
306+
data2.put("key%4", data3);
307+
data1.put("key 1", "value1");
308+
data1.put("key&2", data2);
309+
310+
event.put(key, data1, true);
311+
assertThat(event.get("foo/key_1", String.class), equalTo("value1"));
312+
assertThat(event.get("foo/key_2/key_3", Integer.class), equalTo(3));
313+
assertThat(event.get("foo/key_2/key_4/key_5", String.class), equalTo("value5"));
314+
}
315+
256316
@Test
257317
public void testPutAndGet_withMultiLevelKeyTwice() {
258318
final String key = "foo/bar";
@@ -1096,7 +1156,7 @@ void testJsonStringBuilderWithIncludeKeys() {
10961156

10971157

10981158
}
1099-
1159+
11001160
@Test
11011161
void testJsonStringBuilderWithExcludeKeys() {
11021162
final String jsonString = "{\"id\":1,\"foo\":\"bar\",\"info\":{\"name\":\"hello\",\"foo\":\"bar\"},\"tags\":[{\"key\":\"a\",\"value\":\"b\"},{\"key\":\"c\",\"value\":\"d\"}]}";

data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessor.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public class CsvProcessor extends AbstractProcessor<Record<Event>, Record<Event>
5252
private final CsvMapper mapper;
5353
private final CsvSchema schema;
5454

55+
private final boolean normalizeKeys;
56+
5557
@DataPrepperPluginConstructor
5658
public CsvProcessor(final PluginMetrics pluginMetrics,
5759
final CsvProcessorConfig config,
@@ -67,6 +69,7 @@ public CsvProcessor(final PluginMetrics pluginMetrics,
6769
String.format("csv_when value of %s is not a valid expression statement. " +
6870
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.", config.getCsvWhen()));
6971
}
72+
this.normalizeKeys = config.getNormalizeKeys();
7073
this.mapper = createCsvMapper();
7174
this.schema = createCsvSchema();
7275
}
@@ -107,7 +110,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
107110
}
108111
continue;
109112
}
110-
113+
111114

112115
final boolean userDidSpecifyHeaderEventKey = Objects.nonNull(config.getColumnNamesSourceKey());
113116
final boolean thisEventHasHeaderSource = userDidSpecifyHeaderEventKey && event.containsKey(config.getColumnNamesSourceKey());
@@ -210,7 +213,8 @@ private List<String> parseHeaderFromEventSourceKey(final Event event, final CsvM
210213
private void putDataInEvent(final Event event, final List<String> header, final List<String> data) {
211214
int providedHeaderColIdx = 0;
212215
for (; providedHeaderColIdx < header.size() && providedHeaderColIdx < data.size(); providedHeaderColIdx++) {
213-
event.put(header.get(providedHeaderColIdx), data.get(providedHeaderColIdx));
216+
String key = header.get(providedHeaderColIdx);
217+
event.put(key, data.get(providedHeaderColIdx), normalizeKeys);
214218
}
215219
for (int remainingColIdx = providedHeaderColIdx; remainingColIdx < data.size(); remainingColIdx++) {
216220
event.put(generateColumnHeader(remainingColIdx), data.get(remainingColIdx));

data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public class CsvProcessorConfig {
4242
"is parsed. If there is no event header, no action is taken. Default value is true.")
4343
private Boolean deleteHeader = DEFAULT_DELETE_HEADERS;
4444

45+
@JsonProperty(value = "normalize_keys", defaultValue = "false")
46+
@JsonPropertyDescription("If set to true, replaces invalid characters with underscore character")
47+
private Boolean normalizeKeys = false;
48+
49+
4550
@JsonProperty(value = "multiline", defaultValue = "false")
4651
@JsonPropertyDescription("If specified, the source key has multiple lines, including header line")
4752
private Boolean multiLine = false;
@@ -130,6 +135,10 @@ public String getColumnNamesSourceKey() {
130135
return columnNamesSourceKey;
131136
}
132137

138+
public Boolean getNormalizeKeys() {
139+
return normalizeKeys;
140+
}
141+
133142
/**
134143
* A list of user-specified column names for the CSV data.
135144
* If column_names_source_key is defined, the header in column_names_source_key will be used to generate the Event fields.

data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,21 @@ void test_when_multLine_then_parsedCorrectly() {
141141
assertThatKeyEquals(parsedEvent, "key3", "3");
142142
}
143143

144+
@Test
145+
void test_when_multLine_invalid_keys_then_parsedCorrectly() {
146+
when(processorConfig.isMultiLine()).thenReturn(true);
147+
when(processorConfig.getDelimiter()).thenReturn(",");
148+
when(processorConfig.getNormalizeKeys()).thenReturn(true);
149+
csvProcessor = createObjectUnderTest();
150+
151+
Record<Event> eventUnderTest = createMessageEvent("key 1,key$2,key^3\n1,2,3");
152+
final List<Record<Event>> editedEvents = (List<Record<Event>>) csvProcessor.doExecute(Collections.singletonList(eventUnderTest));
153+
final Event parsedEvent = getSingleEvent(editedEvents);
154+
assertThatKeyEquals(parsedEvent, "key_1", "1");
155+
assertThatKeyEquals(parsedEvent, "key_2", "2");
156+
assertThatKeyEquals(parsedEvent, "key_3", "3");
157+
}
158+
144159
@Test
145160
void test_when_deleteHeaderAndHeaderSourceDefined_then_headerIsDeleted() {
146161
when(processorConfig.isDeleteHeader()).thenReturn(true);
@@ -161,6 +176,26 @@ void test_when_deleteHeaderAndHeaderSourceDefined_then_headerIsDeleted() {
161176
assertThatKeyEquals(parsedEvent, "col3", "3");
162177
}
163178

179+
@Test
180+
void test_when_ColumnNames_with_invalid_keys_parsedCorrectly() {
181+
when(processorConfig.getColumnNamesSourceKey()).thenReturn("header");
182+
when(processorConfig.getNormalizeKeys()).thenReturn(true);
183+
csvProcessor = createObjectUnderTest();
184+
185+
final Map<String, Object> eventData = new HashMap<>();
186+
eventData.put("message","1,2,3");
187+
eventData.put("header","col 1,col$2,col^3");
188+
Record<Event> eventUnderTest = buildRecordWithEvent(eventData);
189+
final List<Record<Event>> editedEvents = (List<Record<Event>>) csvProcessor.doExecute(Collections.singletonList(eventUnderTest));
190+
final Event parsedEvent = getSingleEvent(editedEvents);
191+
192+
assertThat(parsedEvent.containsKey("message"), equalTo(true));
193+
194+
assertThatKeyEquals(parsedEvent, "col_1", "1");
195+
assertThatKeyEquals(parsedEvent, "col_2", "2");
196+
assertThatKeyEquals(parsedEvent, "col_3", "3");
197+
}
198+
164199
@Test
165200
void test_when_headerSource_then_usesHeaderSourceForParsing() {
166201
when(processorConfig.isDeleteHeader()).thenReturn(false);

data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessor.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public class KeyValueProcessor extends AbstractProcessor<Record<Event>, Record<E
6464
private final List<String> tagsOnFailure;
6565
private final Character stringLiteralCharacter;
6666
private final String keyPrefix;
67+
private final boolean normalizeKeys;
6768

6869
@DataPrepperPluginConstructor
6970
public KeyValueProcessor(final PluginMetrics pluginMetrics,
@@ -76,6 +77,8 @@ public KeyValueProcessor(final PluginMetrics pluginMetrics,
7677

7778
tagsOnFailure = keyValueProcessorConfig.getTagsOnFailure();
7879

80+
this.normalizeKeys = keyValueProcessorConfig.getNormalizeKeys();
81+
7982
if (keyValueProcessorConfig.getFieldDelimiterRegex() != null
8083
&& !keyValueProcessorConfig.getFieldDelimiterRegex().isEmpty()) {
8184
if (keyValueProcessorConfig.getFieldSplitCharacters() != null
@@ -389,7 +392,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
389392
} else {
390393
if (keyValueProcessorConfig.getOverwriteIfDestinationExists() ||
391394
!recordEvent.containsKey(keyValueProcessorConfig.getDestination())) {
392-
recordEvent.put(keyValueProcessorConfig.getDestination(), processedMap);
395+
recordEvent.put(keyValueProcessorConfig.getDestination(), processedMap, normalizeKeys);
393396
}
394397
}
395398
} catch (final Exception e) {
@@ -615,7 +618,8 @@ private Map<String, Object> executeConfigs(Map<String, Object> map) {
615618
continue;
616619
}
617620
if (validKeyAndValue(pair.getKey(), pair.getValue())) {
618-
processed.put(pair.getKey(), pair.getValue());
621+
String key = pair.getKey();
622+
processed.put(key, pair.getValue());
619623
}
620624
}
621625

@@ -688,7 +692,7 @@ private void writeToRoot(final Event event, final Map<String, Object> parsedJson
688692
for (Map.Entry<String, Object> entry : parsedJson.entrySet()) {
689693
try {
690694
if (keyValueProcessorConfig.getOverwriteIfDestinationExists() || !event.containsKey(entry.getKey())) {
691-
event.put(entry.getKey(), entry.getValue());
695+
event.put(entry.getKey(), entry.getValue(), normalizeKeys);
692696
}
693697
} catch (IllegalArgumentException e) {
694698
LOG.warn("Failed to put key: "+entry.getKey()+" value : "+entry.getValue()+" into event. ", e);

0 commit comments

Comments
 (0)