Skip to content

Commit c3fb86f

Browse files
committed
Fix KafkaSourceJsonTypeIT: update kafka_headers type from byte[] to String
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
1 parent 2f21000 commit c3fb86f

1 file changed

Lines changed: 12 additions & 12 deletions

File tree

  • data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,9 @@ public class KafkaSourceJsonTypeIT {
113113
private String testTopic;
114114
private String testGroup;
115115
private String headerKey1;
116-
private byte[] headerValue1;
116+
private String headerValue1;
117117
private String headerKey2;
118-
private byte[] headerValue2;
118+
private String headerValue2;
119119

120120
public KafkaSource createObjectUnderTest() {
121121
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription,
@@ -125,9 +125,9 @@ public KafkaSource createObjectUnderTest() {
125125
@BeforeEach
126126
public void setup() throws Throwable {
127127
headerKey1 = RandomStringUtils.randomAlphabetic(6);
128-
headerValue1 = RandomStringUtils.randomAlphabetic(10).getBytes(StandardCharsets.UTF_8);
128+
headerValue1 = RandomStringUtils.randomAlphabetic(10);
129129
headerKey2 = RandomStringUtils.randomAlphabetic(5);
130-
headerValue2 = RandomStringUtils.randomAlphabetic(15).getBytes(StandardCharsets.UTF_8);
130+
headerValue2 = RandomStringUtils.randomAlphabetic(15);
131131
sourceConfig = mock(KafkaSourceConfig.class);
132132
pluginMetrics = mock(PluginMetrics.class);
133133
counter = mock(Counter.class);
@@ -235,7 +235,7 @@ public void TestJsonRecordsWithNullKey() throws Exception {
235235
assertThat(map.get("kafka_key"), equalTo(null));
236236
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
237237
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
238-
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
238+
Map<String, String> kafkaHeaders = (Map<String, String>) metadata.getAttributes().get("kafka_headers");
239239
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
240240
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
241241
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
@@ -273,7 +273,7 @@ public void TestJsonRecordsWithNegativeAcknowledgements() throws Exception {
273273
assertThat(map.get("status"), equalTo(true));
274274
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
275275
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
276-
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
276+
Map<String, String> kafkaHeaders = (Map<String, String>) metadata.getAttributes().get("kafka_headers");
277277
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
278278
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
279279
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
@@ -298,7 +298,7 @@ public void TestJsonRecordsWithNegativeAcknowledgements() throws Exception {
298298
assertThat(map.get("status"), equalTo(true));
299299
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
300300
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
301-
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
301+
Map<String, String> kafkaHeaders = (Map<String, String>) metadata.getAttributes().get("kafka_headers");
302302
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
303303
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
304304
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
@@ -336,7 +336,7 @@ public void TestJsonRecordsWithKafkaKeyModeDiscard() throws Exception {
336336
assertThat(map.get("status"), equalTo(true));
337337
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
338338
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
339-
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
339+
Map<String, String> kafkaHeaders = (Map<String, String>) metadata.getAttributes().get("kafka_headers");
340340
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
341341
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
342342
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
@@ -374,7 +374,7 @@ public void TestJsonRecordsWithKafkaKeyModeAsField() throws Exception {
374374
assertThat(map.get("kafka_key"), equalTo(testKey));
375375
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
376376
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
377-
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
377+
Map<String, String> kafkaHeaders = (Map<String, String>) metadata.getAttributes().get("kafka_headers");
378378
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
379379
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
380380
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
@@ -412,7 +412,7 @@ public void TestJsonRecordsWithKafkaKeyModeAsMetadata() throws Exception {
412412
assertThat(metadata.getAttributes().get("kafka_key"), equalTo(testKey));
413413
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
414414
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
415-
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
415+
Map<String, String> kafkaHeaders = (Map<String, String>) metadata.getAttributes().get("kafka_headers");
416416
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
417417
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
418418
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
@@ -433,8 +433,8 @@ public void produceJsonRecords(final String servers, final String topicName, fin
433433
for (int i = 0; i < numRecords; i++) {
434434
String value = "{\"name\":\"testName" + i + "\", \"id\":" + (TEST_ID + i) + ", \"status\":true}";
435435
List<Header> headers = Arrays.asList(
436-
new RecordHeader(headerKey1, headerValue1),
437-
new RecordHeader(headerKey2, headerValue2)
436+
new RecordHeader(headerKey1, headerValue1.getBytes(StandardCharsets.UTF_8)),
437+
new RecordHeader(headerKey2, headerValue2.getBytes(StandardCharsets.UTF_8))
438438
);
439439
ProducerRecord<String, String> record =
440440
new ProducerRecord<>(topicName, null, testKey, value, new RecordHeaders(headers));

0 commit comments

Comments
 (0)