Skip to content

Commit d2aa114

Browse files
authored
Track the source of request for Kafka server (#4572)
Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
1 parent 17790ff commit d2aa114

6 files changed

Lines changed: 47 additions & 22 deletions

File tree

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig
6565
@Size(min = 1, max = 255, message = "size of group id should be between 1 and 255")
6666
private String groupId;
6767

68+
@JsonProperty("client_id")
69+
@Valid
70+
@Size(min = 1, max = 255, message = "size of client id should be between 1 and 255")
71+
private String clientId;
72+
6873
@JsonProperty("workers")
6974
@Valid
7075
@Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200")
@@ -135,6 +140,11 @@ public String getGroupId() {
135140
return groupId;
136141
}
137142

143+
@Override
144+
public String getClientId() {
145+
return clientId;
146+
}
147+
138148
@Override
139149
public Duration getCommitInterval() {
140150
return commitInterval;

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/TopicConsumerConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ public interface TopicConsumerConfig extends TopicConfig {
1616

1717
String getGroupId();
1818

19+
String getClientId();
20+
1921
Boolean getAutoCommit();
2022

2123
String getAutoOffsetReset();

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,19 @@ private Properties getConsumerProperties(final KafkaConsumerConfig sourceConfig,
134134
break;
135135
}
136136
}
137-
setConsumerTopicProperties(properties, topicConfig);
137+
setConsumerTopicProperties(properties, topicConfig, topicConfig.getGroupId());
138138
setSchemaRegistryProperties(sourceConfig, properties, topicConfig);
139139
LOG.debug("Starting consumer with the properties : {}", properties);
140140
return properties;
141141
}
142142

143-
private void setConsumerTopicProperties(final Properties properties, final TopicConsumerConfig topicConfig) {
144-
properties.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId());
143+
144+
public static void setConsumerTopicProperties(final Properties properties, final TopicConsumerConfig topicConfig,
145+
final String groupId) {
146+
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
147+
if (Objects.nonNull(topicConfig.getClientId())) {
148+
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, topicConfig.getClientId());
149+
}
145150
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int)topicConfig.getMaxPartitionFetchBytes());
146151
properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long)topicConfig.getRetryBackoff().toMillis()).intValue());
147152
properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long)topicConfig.getReconnectBackoff().toMillis()).intValue());

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType;
3838
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
3939
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer;
40+
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
4041
import org.opensearch.dataprepper.plugins.kafka.consumer.PauseConsumePredicate;
4142
import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier;
4243
import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
@@ -318,25 +319,7 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic
318319
}
319320

320321
private void setConsumerTopicProperties(Properties properties, TopicConsumerConfig topicConfig) {
321-
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupID);
322-
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int) topicConfig.getMaxPartitionFetchBytes());
323-
properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long) topicConfig.getRetryBackoff().toMillis()).intValue());
324-
properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long) topicConfig.getReconnectBackoff().toMillis()).intValue());
325-
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
326-
topicConfig.getAutoCommit());
327-
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
328-
((Long) topicConfig.getCommitInterval().toMillis()).intValue());
329-
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
330-
topicConfig.getAutoOffsetReset());
331-
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
332-
topicConfig.getConsumerMaxPollRecords());
333-
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
334-
((Long) topicConfig.getMaxPollInterval().toMillis()).intValue());
335-
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long) topicConfig.getSessionTimeOut().toMillis()).intValue());
336-
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long) topicConfig.getHeartBeatInterval().toMillis()).intValue());
337-
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int) topicConfig.getFetchMaxBytes());
338-
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait());
339-
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int) topicConfig.getFetchMinBytes());
322+
KafkaCustomConsumerFactory.setConsumerTopicProperties(properties, topicConfig, consumerGroupID);
340323
}
341324

342325
private void setPropertiesForSchemaRegistryConnectivity(Properties properties) {

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/SourceTopicConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig
4949
@Size(min = 1, max = 255, message = "size of group id should be between 1 and 255")
5050
private String groupId;
5151

52+
@JsonProperty("client_id")
53+
@Valid
54+
@Size(min = 1, max = 255, message = "size of client id should be between 1 and 255")
55+
private String clientId;
56+
5257
@JsonProperty("workers")
5358
@Valid
5459
@Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200")
@@ -121,6 +126,11 @@ public String getGroupId() {
121126
return groupId;
122127
}
123128

129+
@Override
130+
public String getClientId() {
131+
return clientId;
132+
}
133+
124134
@Override
125135
public MessageFormat getSerdeFormat() {
126136
return serdeFormat;

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ class KafkaSourceTest {
8282
private PluginConfigObservable pluginConfigObservable;
8383

8484
private static final String TEST_GROUP_ID = "testGroupId";
85+
private static final String TEST_CLIENT_ID = "testClientId";
8586

8687
public KafkaSource createObjectUnderTest() {
8788
return new KafkaSource(
@@ -107,6 +108,8 @@ void setUp() throws Exception {
107108
when(topic2.getConsumerMaxPollRecords()).thenReturn(1);
108109
when(topic1.getGroupId()).thenReturn(TEST_GROUP_ID);
109110
when(topic2.getGroupId()).thenReturn(TEST_GROUP_ID);
111+
when(topic1.getClientId()).thenReturn(TEST_CLIENT_ID);
112+
when(topic2.getClientId()).thenReturn(TEST_CLIENT_ID);
110113
when(topic1.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5));
111114
when(topic2.getMaxPollInterval()).thenReturn(Duration.ofSeconds(5));
112115
when(topic1.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(5));
@@ -156,6 +159,18 @@ void test_kafkaSource_basicFunctionality() {
156159
assertTrue(Objects.nonNull(kafkaSource.getConsumer()));
157160
}
158161

162+
@Test
163+
void test_kafkaSource_basicFunctionalityWithClientIdNull() {
164+
when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
165+
when(topic2.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));
166+
when(topic1.getClientId()).thenReturn(null);
167+
when(topic1.getClientId()).thenReturn(null);
168+
kafkaSource = createObjectUnderTest();
169+
assertTrue(Objects.nonNull(kafkaSource));
170+
kafkaSource.start(buffer);
171+
assertTrue(Objects.nonNull(kafkaSource.getConsumer()));
172+
}
173+
159174
@Test
160175
void test_kafkaSource_retry_consumer_create() throws InterruptedException {
161176
when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15));

0 commit comments

Comments
 (0)