Skip to content

Commit 182820f

Browse files
authored
branch-4.0: [enhance](job) add zero-row hint for Kafka read_committed load and [opt](job) delay Kafka read committed zero-row retries (#64584)
pick #63664; pick #64046
1 parent 4ab6261 commit 182820f

5 files changed

Lines changed: 220 additions & 50 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.doris.common.LoadException;
3232
import org.apache.doris.common.Pair;
3333
import org.apache.doris.common.UserException;
34+
import org.apache.doris.common.util.DebugPointUtil;
3435
import org.apache.doris.common.util.DebugUtil;
3536
import org.apache.doris.common.util.LogBuilder;
3637
import org.apache.doris.common.util.LogKey;
@@ -89,6 +90,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
8990

9091
public static final String KAFKA_FILE_CATALOG = "kafka";
9192
public static final String PROP_GROUP_ID = "group.id";
93+
private static final String KAFKA_ISOLATION_LEVEL = "isolation.level";
94+
private static final String KAFKA_READ_COMMITTED = "read_committed";
95+
private static final String HAS_POSITIVE_LAG_DEBUG_POINT = "KafkaRoutineLoadJob.hasPositiveLagForTask";
96+
private static final String READ_COMMITTED_ZERO_ROWS_WITH_LAG_MESSAGE = "Kafka routine load consumed 0 rows "
97+
+ "while lag is still positive under isolation.level=read_committed. If the upstream producer uses "
98+
+ "Kafka transactions, some records may be in uncommitted transactions and are not visible yet.";
9299

93100
@SerializedName("bl")
94101
private String brokerList;
@@ -357,6 +364,33 @@ private void updateLatestOffsetsCache(List<Pair<Integer, Long>> latestOffsets, U
357364
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException {
358365
updateProgressAndOffsetsCache(attachment);
359366
super.updateProgress(attachment);
367+
updateReadCommittedLagHint(attachment);
368+
}
369+
370+
private void updateReadCommittedLagHint(RLTaskTxnCommitAttachment attachment) {
371+
if (shouldDelayScheduleForReadCommittedZeroRowsWithLag(attachment)) {
372+
setOtherMsg(READ_COMMITTED_ZERO_ROWS_WITH_LAG_MESSAGE);
373+
}
374+
}
375+
376+
boolean shouldDelayScheduleForReadCommittedZeroRowsWithLag(RLTaskTxnCommitAttachment attachment) {
377+
return DebugPointUtil.isEnable(HAS_POSITIVE_LAG_DEBUG_POINT)
378+
|| (attachment.getTotalRows() == 0 && isReadCommitted() && hasPositiveLagForTask(attachment));
379+
}
380+
381+
private boolean isReadCommitted() {
382+
return KAFKA_READ_COMMITTED.equalsIgnoreCase(customProperties.get(KAFKA_ISOLATION_LEVEL));
383+
}
384+
385+
private boolean hasPositiveLagForTask(RLTaskTxnCommitAttachment attachment) {
386+
Map<Integer, Long> partitionIdToOffset = ((KafkaProgress) attachment.getProgress()).getOffsetByPartition();
387+
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
388+
Long latestOffset = cachedPartitionWithLatestOffsets.get(entry.getKey());
389+
if (latestOffset != null && latestOffset > entry.getValue() + 1) {
390+
return true;
391+
}
392+
}
393+
return false;
360394
}
361395

362396
@Override
@@ -377,7 +411,7 @@ protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoad
377411
// add new task
378412
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(oldKafkaTaskInfo,
379413
((KafkaProgress) progress).getPartitionIdToOffset(oldKafkaTaskInfo.getPartitions()), isMultiTable());
380-
kafkaTaskInfo.setDelaySchedule(delaySchedule);
414+
kafkaTaskInfo.setDelaySchedule(delaySchedule || oldKafkaTaskInfo.isDelaySchedule());
381415
// remove old task
382416
routineLoadTaskInfoList.remove(routineLoadTaskInfo);
383417
// add new task

fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ boolean hasMoreDataToConsume() throws UserException {
159159
return routineLoadJob.hasMoreDataToConsume(id, partitionIdToOffset);
160160
}
161161

162+
@Override
163+
protected boolean shouldDelaySchedule(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
164+
KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) routineLoadManager.getJob(jobId);
165+
return routineLoadJob.shouldDelayScheduleForReadCommittedZeroRowsWithLag(rlTaskTxnCommitAttachment);
166+
}
167+
162168
private TPipelineFragmentParams rePlan(RoutineLoadJob routineLoadJob) throws UserException {
163169
TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits());
164170
// plan for each task, in case table has change(rollup or schema change)

fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@ public boolean getIsEof() {
158158
return isEof;
159159
}
160160

161+
public boolean isDelaySchedule() {
162+
return delaySchedule;
163+
}
164+
161165
public boolean needDedalySchedule() {
162166
return delaySchedule || isEof;
163167
}
@@ -178,6 +182,7 @@ public boolean isTimeout() {
178182

179183
public void handleTaskByTxnCommitAttachment(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
180184
judgeEof(rlTaskTxnCommitAttachment);
185+
this.delaySchedule = shouldDelaySchedule(rlTaskTxnCommitAttachment);
181186
}
182187

183188
private void judgeEof(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
@@ -195,7 +200,11 @@ private void judgeEof(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
195200
}
196201
}
197202

198-
abstract TRoutineLoadTask createRoutineLoadTask() throws UserException;
203+
protected boolean shouldDelaySchedule(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
204+
return false;
205+
}
206+
207+
protected abstract TRoutineLoadTask createRoutineLoadTask() throws UserException;
199208

200209
public void updateAdaptiveTimeout(RoutineLoadJob routineLoadJob) {
201210
}

fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.doris.common.Pair;
3535
import org.apache.doris.common.UserException;
3636
import org.apache.doris.common.jmockit.Deencapsulation;
37+
import org.apache.doris.common.jmockit.FieldReflection;
3738
import org.apache.doris.datasource.kafka.KafkaUtil;
3839
import org.apache.doris.load.RoutineLoadDesc;
3940
import org.apache.doris.load.loadv2.LoadTask;
@@ -231,6 +232,81 @@ public List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID taskId, Strin
231232
Assert.assertEquals(15L, routineLoadJob.totalLag().longValue());
232233
}
233234

235+
@Test
236+
public void testUpdateProgressWarnsWhenReadCommittedTaskHasZeroRowsAndLag() throws UserException {
237+
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L,
238+
1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
239+
Map<String, String> customProperties = Maps.newHashMap();
240+
customProperties.put("isolation.level", "read_committed");
241+
Deencapsulation.setField(routineLoadJob, "customProperties", customProperties);
242+
243+
Map<Integer, Long> cachedPartitionWithLatestOffsets = Maps.newHashMap();
244+
cachedPartitionWithLatestOffsets.put(1, 20L);
245+
Deencapsulation.setField(routineLoadJob, "cachedPartitionWithLatestOffsets",
246+
cachedPartitionWithLatestOffsets);
247+
248+
Map<Integer, Long> taskProgress = Maps.newHashMap();
249+
taskProgress.put(1, 10L);
250+
RLTaskTxnCommitAttachment attachment = new RLTaskTxnCommitAttachment();
251+
Deencapsulation.setField(attachment, "progress", new KafkaProgress(taskProgress));
252+
253+
Deencapsulation.invoke(routineLoadJob, "updateProgress", attachment);
254+
255+
String otherMsg = Deencapsulation.getField(routineLoadJob, "otherMsg");
256+
Assert.assertTrue(otherMsg.contains("some records may be in uncommitted transactions"));
257+
}
258+
259+
@Test
260+
public void testReadCommittedZeroRowsWithLagDelaysNextTask(
261+
@Injectable RoutineLoadManager routineLoadManager) throws UserException {
262+
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L,
263+
1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
264+
new Expectations() {
265+
{
266+
routineLoadManager.getJob(1L);
267+
minTimes = 0;
268+
result = routineLoadJob;
269+
}
270+
};
271+
272+
Map<String, String> customProperties = Maps.newHashMap();
273+
customProperties.put("isolation.level", "read_committed");
274+
Deencapsulation.setField(routineLoadJob, "customProperties", customProperties);
275+
276+
Map<Integer, Long> cachedPartitionWithLatestOffsets = Maps.newHashMap();
277+
cachedPartitionWithLatestOffsets.put(1, 20L);
278+
Deencapsulation.setField(routineLoadJob, "cachedPartitionWithLatestOffsets",
279+
cachedPartitionWithLatestOffsets);
280+
281+
Map<Integer, Long> taskProgress = Maps.newHashMap();
282+
taskProgress.put(1, 10L);
283+
Deencapsulation.setField(routineLoadJob, "progress", new KafkaProgress(taskProgress));
284+
285+
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L, 20000,
286+
taskProgress, false, 1000, false);
287+
FieldReflection.setField(RoutineLoadTaskInfo.class, kafkaTaskInfo, "routineLoadManager",
288+
routineLoadManager);
289+
FieldReflection.setField(KafkaTaskInfo.class, kafkaTaskInfo, "routineLoadManager",
290+
routineLoadManager);
291+
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = new ArrayList<>();
292+
routineLoadTaskInfoList.add(kafkaTaskInfo);
293+
Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList);
294+
295+
RLTaskTxnCommitAttachment attachment = new RLTaskTxnCommitAttachment();
296+
Deencapsulation.setField(attachment, "progress", new KafkaProgress(taskProgress));
297+
Deencapsulation.setField(attachment, "taskExecutionTimeMs",
298+
routineLoadJob.getMaxBatchIntervalS() * 1000);
299+
300+
kafkaTaskInfo.handleTaskByTxnCommitAttachment(attachment);
301+
302+
Assert.assertFalse(kafkaTaskInfo.getIsEof());
303+
Assert.assertTrue(kafkaTaskInfo.needDedalySchedule());
304+
305+
RoutineLoadTaskInfo newTask = Deencapsulation.invoke(routineLoadJob,
306+
"unprotectRenewTask", kafkaTaskInfo, false);
307+
Assert.assertTrue(newTask.needDedalySchedule());
308+
}
309+
234310
@Test
235311
public void testUpdateLagRebuildsConvertedPropertiesAfterReplay(@Mocked Env env) throws UserException {
236312
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", 1L,

regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy

Lines changed: 93 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
import org.apache.doris.regression.util.RoutineLoadTestUtils
1819
import org.apache.kafka.clients.admin.AdminClient
19-
import org.apache.kafka.clients.producer.KafkaProducer
20-
import org.apache.kafka.clients.producer.ProducerRecord
20+
import org.apache.kafka.clients.admin.NewTopic
2121
import org.apache.kafka.clients.producer.ProducerConfig
2222

2323
suite("test_routine_load_error_info","nonConcurrent") {
@@ -30,52 +30,19 @@ suite("test_routine_load_error_info","nonConcurrent") {
3030
"test_error_info",
3131
]
3232

33-
String enabled = context.config.otherConfigs.get("enableKafkaTest")
34-
String kafka_port = context.config.otherConfigs.get("kafka_port")
35-
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
36-
def kafka_broker = "${externalEnvIp}:${kafka_port}"
33+
def kafkaTestEnabled = RoutineLoadTestUtils.isKafkaTestEnabled(context)
34+
def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
3735

3836
// send data to kafka
39-
if (enabled != null && enabled.equalsIgnoreCase("true")) {
40-
def props = new Properties()
41-
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
42-
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
43-
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
44-
// add timeout config
45-
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
46-
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
47-
48-
// check conenction
49-
def verifyKafkaConnection = { prod ->
50-
try {
51-
logger.info("=====try to connect Kafka========")
52-
def partitions = prod.partitionsFor("__connection_verification_topic")
53-
return partitions != null
54-
} catch (Exception e) {
55-
throw new Exception("Kafka connect fail: ${e.message}".toString())
56-
}
57-
}
58-
// Create kafka producer
59-
def producer = new KafkaProducer<>(props)
37+
if (kafkaTestEnabled) {
38+
def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
6039
try {
61-
logger.info("Kafka connecting: ${kafka_broker}")
62-
if (!verifyKafkaConnection(producer)) {
63-
throw new Exception("can't get any kafka info")
40+
for (String kafkaCsvTopic in kafkaCsvTpoics) {
41+
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
42+
RoutineLoadTestUtils.sendTestDataToKafka(producer, [kafkaCsvTopic], txt.readLines())
6443
}
65-
} catch (Exception e) {
66-
logger.error("FATAL: " + e.getMessage())
44+
} finally {
6745
producer.close()
68-
throw e
69-
}
70-
logger.info("Kafka connect success")
71-
for (String kafkaCsvTopic in kafkaCsvTpoics) {
72-
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
73-
def lines = txt.readLines()
74-
lines.each { line ->
75-
logger.info("=====${line}========")
76-
def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
77-
producer.send(record)
78-
}
7946
}
8047
}
8148

@@ -165,15 +132,47 @@ suite("test_routine_load_error_info","nonConcurrent") {
165132
)
166133
FROM KAFKA
167134
(
168-
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
135+
"kafka_broker_list" = "${kafka_broker}",
169136
"kafka_topic" = "${kafkaTopic}",
170137
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
171138
);
172139
"""
173140
}
174141

142+
def createReadCommittedJob = {jobName, tableName, kafkaTopic ->
143+
sql """
144+
CREATE ROUTINE LOAD ${jobName} on ${tableName}
145+
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
146+
COLUMNS TERMINATED BY "|"
147+
PROPERTIES
148+
(
149+
"max_batch_interval" = "5",
150+
"max_batch_rows" = "300000",
151+
"max_batch_size" = "209715200"
152+
)
153+
FROM KAFKA
154+
(
155+
"kafka_broker_list" = "${kafka_broker}",
156+
"kafka_topic" = "${kafkaTopic}",
157+
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
158+
"property.isolation.level" = "read_committed"
159+
);
160+
"""
161+
}
162+
163+
def createKafkaTopic = {kafkaTopic ->
164+
def adminProps = new Properties()
165+
adminProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
166+
def adminClient = AdminClient.create(adminProps)
167+
try {
168+
adminClient.createTopics([new NewTopic(kafkaTopic, 1, (short) 1)]).all().get()
169+
} finally {
170+
adminClient.close()
171+
}
172+
}
173+
175174
// case 1: task failed
176-
if (enabled != null && enabled.equalsIgnoreCase("true")) {
175+
if (kafkaTestEnabled) {
177176
// create table
178177
def jobName = "test_error_info"
179178
def tableName = "test_routine_error_info"
@@ -209,7 +208,7 @@ suite("test_routine_load_error_info","nonConcurrent") {
209208
}
210209

211210
// case 2: reschedule job
212-
if (enabled != null && enabled.equalsIgnoreCase("true")) {
211+
if (kafkaTestEnabled) {
213212
def jobName = "test_error_info"
214213
def tableName = "test_routine_error_info"
215214
try {
@@ -242,7 +241,7 @@ suite("test_routine_load_error_info","nonConcurrent") {
242241
}
243242

244243
// case 3: memory limit
245-
if (enabled != null && enabled.equalsIgnoreCase("true")) {
244+
if (kafkaTestEnabled) {
246245
def jobName = "test_memory_limit_error_info"
247246
def tableName = "test_routine_memory_limit_error_info"
248247

@@ -276,4 +275,50 @@ suite("test_routine_load_error_info","nonConcurrent") {
276275
sql "DROP TABLE IF EXISTS ${tableName}"
277276
}
278277
}
279-
}
278+
279+
// case 4: read_committed lag hint
280+
if (kafkaTestEnabled) {
281+
def jobName = "test_read_committed_lag_error_info"
282+
def tableName = "test_routine_read_committed_lag_error_info"
283+
def kafkaTopic = "test_read_committed_lag_error_info_${System.currentTimeMillis()}"
284+
def debugPoint = "KafkaRoutineLoadJob.hasPositiveLagForTask"
285+
286+
try {
287+
createKafkaTopic(kafkaTopic)
288+
def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
289+
try {
290+
def txt = new File("""${context.file.parent}/data/${kafkaCsvTpoics[0]}.csv""").text
291+
RoutineLoadTestUtils.sendTestDataToKafka(producer, [kafkaTopic], txt.readLines())
292+
} finally {
293+
producer.close()
294+
}
295+
createTable(tableName)
296+
sql "sync"
297+
GetDebugPoint().enableDebugPointForAllFEs(debugPoint)
298+
createReadCommittedJob(jobName, tableName, kafkaTopic)
299+
sql "sync"
300+
301+
// check error info
302+
def count = 0
303+
while (true) {
304+
def res = sql "show routine load for ${jobName}"
305+
log.info("show routine load: ${res[0].toString()}".toString())
306+
log.info("other msg: ${res[0][19].toString()}".toString())
307+
if (res[0][19].toString() != "") {
308+
assertTrue(res[0][19].toString().contains("some records may be in uncommitted transactions"))
309+
break;
310+
}
311+
count++
312+
if (count > 60) {
313+
assertEquals(1, 2)
314+
break;
315+
}
316+
sleep(1000)
317+
}
318+
} finally {
319+
GetDebugPoint().disableDebugPointForAllFEs(debugPoint)
320+
sql "stop routine load for ${jobName}"
321+
sql "DROP TABLE IF EXISTS ${tableName}"
322+
}
323+
}
324+
}

0 commit comments

Comments
 (0)