Skip to content

Commit bbff29d

Browse files
authored
[fix][io] Kafka Source connector maybe stuck (#22511)
1 parent 20915d1 commit bbff29d

2 files changed

Lines changed: 116 additions & 1 deletion

File tree

pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Optional;
2828
import java.util.Properties;
2929
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.TimeUnit;
3031
import lombok.Getter;
3132
import lombok.extern.slf4j.Slf4j;
3233
import org.apache.commons.lang3.StringUtils;
@@ -63,6 +64,7 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> {
6364
private volatile boolean running = false;
6465
private KafkaSourceConfig kafkaSourceConfig;
6566
private Thread runnerThread;
67+
private long maxPollIntervalMs;
6668

6769
@Override
6870
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
@@ -126,6 +128,13 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
126128
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaSourceConfig.getAutoOffsetReset());
127129
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getKeyDeserializationClass());
128130
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getValueDeserializationClass());
131+
if (props.containsKey(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) {
132+
maxPollIntervalMs = Long.parseLong(props.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG).toString());
133+
} else {
134+
maxPollIntervalMs = Long.parseLong(
135+
ConsumerConfig.configDef().defaultValues().get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)
136+
.toString());
137+
}
129138
try {
130139
consumer = new KafkaConsumer<>(beforeCreateConsumer(props));
131140
} catch (Exception ex) {
@@ -175,7 +184,9 @@ public void start() {
175184
index++;
176185
}
177186
if (!kafkaSourceConfig.isAutoCommitEnabled()) {
178-
CompletableFuture.allOf(futures).get();
187+
// Wait about 2/3 of the time of maxPollIntervalMs.
188+
// so as to avoid waiting for the timeout to be kicked out of the consumer group.
189+
CompletableFuture.allOf(futures).get(maxPollIntervalMs * 2 / 3, TimeUnit.MILLISECONDS);
179190
consumer.commitSync();
180191
}
181192
} catch (Exception e) {
@@ -253,6 +264,21 @@ public void ack() {
253264
completableFuture.complete(null);
254265
}
255266

267+
@Override
268+
public void fail() {
269+
completableFuture.completeExceptionally(
270+
new RuntimeException(
271+
String.format(
272+
"Failed to process record with kafka topic: %s partition: %d offset: %d key: %s",
273+
record.topic(),
274+
record.partition(),
275+
record.offset(),
276+
getKey()
277+
)
278+
)
279+
);
280+
}
281+
256282
@Override
257283
public Schema<V> getSchema() {
258284
return schema;

pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,18 @@
2121
import com.google.common.collect.ImmutableMap;
2222
import java.time.Duration;
2323
import java.util.Collections;
24+
import java.util.Arrays;
2425
import java.lang.reflect.Field;
26+
import java.util.concurrent.ExecutionException;
27+
import java.util.concurrent.TimeoutException;
2528
import org.apache.kafka.clients.consumer.Consumer;
2629
import org.apache.kafka.clients.consumer.ConsumerConfig;
2730
import org.apache.kafka.clients.consumer.ConsumerRecord;
31+
import org.apache.kafka.clients.consumer.ConsumerRecords;
32+
import org.apache.kafka.common.TopicPartition;
2833
import org.apache.kafka.common.security.auth.SecurityProtocol;
2934
import org.apache.pulsar.client.api.Schema;
35+
import org.apache.pulsar.functions.api.Record;
3036
import org.apache.pulsar.io.core.SourceContext;
3137
import org.apache.pulsar.io.kafka.KafkaAbstractSource;
3238
import org.apache.pulsar.io.kafka.KafkaSourceConfig;
@@ -46,6 +52,7 @@
4652
import static org.testng.Assert.assertFalse;
4753
import static org.testng.Assert.assertNotNull;
4854
import static org.testng.Assert.assertNull;
55+
import static org.testng.Assert.assertTrue;
4956
import static org.testng.Assert.expectThrows;
5057
import static org.testng.Assert.fail;
5158

@@ -218,6 +225,88 @@ public final void throwExceptionByPoll() throws Exception {
218225
source.read();
219226
}
220227

228+
@Test
229+
public final void throwExceptionBySendFail() throws Exception {
230+
KafkaAbstractSource source = new DummySource();
231+
232+
KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
233+
kafkaSourceConfig.setTopic("test-topic");
234+
kafkaSourceConfig.setAutoCommitEnabled(false);
235+
Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
236+
kafkaSourceConfigField.setAccessible(true);
237+
kafkaSourceConfigField.set(source, kafkaSourceConfig);
238+
239+
Field defaultMaxPollIntervalMsField = KafkaAbstractSource.class.getDeclaredField("maxPollIntervalMs");
240+
defaultMaxPollIntervalMsField.setAccessible(true);
241+
defaultMaxPollIntervalMsField.set(source, 300000);
242+
243+
Consumer consumer = mock(Consumer.class);
244+
ConsumerRecord<String, byte[]> consumerRecord = new ConsumerRecord<>("topic", 0, 0,
245+
"t-key", "t-value".getBytes(StandardCharsets.UTF_8));
246+
ConsumerRecords<String, byte[]> consumerRecords = new ConsumerRecords<>(Collections.singletonMap(
247+
new TopicPartition("topic", 0),
248+
Arrays.asList(consumerRecord)));
249+
Mockito.doReturn(consumerRecords).when(consumer).poll(Mockito.any(Duration.class));
250+
251+
Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer");
252+
consumerField.setAccessible(true);
253+
consumerField.set(source, consumer);
254+
source.start();
255+
256+
// Mock send message fail
257+
Record record = source.read();
258+
record.fail();
259+
260+
// read again will throw RuntimeException.
261+
try {
262+
source.read();
263+
fail("Should throw exception");
264+
} catch (ExecutionException e) {
265+
assertTrue(e.getCause() instanceof RuntimeException);
266+
assertTrue(e.getCause().getMessage().contains("Failed to process record with kafka topic"));
267+
}
268+
}
269+
270+
@Test
271+
public final void throwExceptionBySendTimeOut() throws Exception {
272+
KafkaAbstractSource source = new DummySource();
273+
274+
KafkaSourceConfig kafkaSourceConfig = new KafkaSourceConfig();
275+
kafkaSourceConfig.setTopic("test-topic");
276+
kafkaSourceConfig.setAutoCommitEnabled(false);
277+
Field kafkaSourceConfigField = KafkaAbstractSource.class.getDeclaredField("kafkaSourceConfig");
278+
kafkaSourceConfigField.setAccessible(true);
279+
kafkaSourceConfigField.set(source, kafkaSourceConfig);
280+
281+
Field defaultMaxPollIntervalMsField = KafkaAbstractSource.class.getDeclaredField("maxPollIntervalMs");
282+
defaultMaxPollIntervalMsField.setAccessible(true);
283+
defaultMaxPollIntervalMsField.set(source, 1);
284+
285+
Consumer consumer = mock(Consumer.class);
286+
ConsumerRecord<String, byte[]> consumerRecord = new ConsumerRecord<>("topic", 0, 0,
287+
"t-key", "t-value".getBytes(StandardCharsets.UTF_8));
288+
ConsumerRecords<String, byte[]> consumerRecords = new ConsumerRecords<>(Collections.singletonMap(
289+
new TopicPartition("topic", 0),
290+
Arrays.asList(consumerRecord)));
291+
Mockito.doReturn(consumerRecords).when(consumer).poll(Mockito.any(Duration.class));
292+
293+
Field consumerField = KafkaAbstractSource.class.getDeclaredField("consumer");
294+
consumerField.setAccessible(true);
295+
consumerField.set(source, consumer);
296+
source.start();
297+
298+
// Mock send message fail, just read do noting.
299+
source.read();
300+
301+
// read again will throw TimeOutException.
302+
try {
303+
source.read();
304+
fail("Should throw exception");
305+
} catch (Exception e) {
306+
assertTrue(e instanceof TimeoutException);
307+
}
308+
}
309+
221310
private File getFile(String name) {
222311
ClassLoader classLoader = getClass().getClassLoader();
223312
return new File(classLoader.getResource(name).getFile());

0 commit comments

Comments
 (0)