Skip to content

Commit cbe23a3

Browse files
authored
[fix](job) fix NPE in routine load Kafka meta request (#63180)
### What problem does this PR solve? Problem Summary: In a single-BE deployment, Kafka routine load fetches topic metadata through the only BE. If that BE cannot connect to Kafka, the metadata request fails and the BE is skipped in the current retry loop. Then FE may have no normal candidate backend left and falls back to backend ids in the routine load blacklist. The blacklist can contain stale backend ids that no longer exist in `SystemInfoService`. In that case, `KafkaUtil` may get a null `Backend` and throw a NullPointerException when calling `be.getHost()`. This hides the real Kafka metadata error, such as broker connection failure. This PR filters stale backend ids when reading the routine load blacklist and adds a final null check before creating the BE address. The original Kafka metadata error is preserved instead of being replaced by the secondary NPE. A regression case is added with an invalid `kafka_broker_list` to verify that routine load reports the expected Kafka metadata error path.
1 parent 90ec8ad commit cbe23a3

2 files changed

Lines changed: 76 additions & 3 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,16 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx
252252
// 2. If that sole backend is decommissioned, the aliveBackends list becomes empty.
253253
// Hence, in such cases, it's essential to rely on the blacklist to obtain meta information.
254254
if (backendIds.isEmpty()) {
255-
for (Long beId : Env.getCurrentEnv().getRoutineLoadManager().getBlacklist().keySet()) {
256-
backendIds.add(beId);
255+
Map<Long, Long> blacklist = Env.getCurrentEnv().getRoutineLoadManager().getBlacklist();
256+
for (Long beId : blacklist.keySet()) {
257+
Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
258+
if (backend != null) {
259+
backendIds.add(beId);
260+
} else {
261+
blacklist.remove(beId);
262+
LOG.warn("remove stale backend {} from routine load blacklist when getting kafka meta",
263+
beId);
264+
}
257265
}
258266
}
259267
if (backendIds.isEmpty()) {
@@ -264,7 +272,16 @@ private static InternalService.PProxyResult getInfoRequest(InternalService.PProx
264272
throw new LoadException("failed to get info: " + errorMsg + ",");
265273
}
266274
Collections.shuffle(backendIds);
267-
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
275+
long selectedBeId = backendIds.get(0);
276+
Backend be = Env.getCurrentSystemInfo().getBackend(selectedBeId);
277+
if (be == null) {
278+
if (errorMsg == null) {
279+
errorMsg = "backend " + selectedBeId + " does not exist";
280+
}
281+
LOG.warn("skip stale backend {} when getting kafka meta", selectedBeId);
282+
retryTimes++;
283+
continue;
284+
}
268285
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
269286
long beId = be.getId();
270287

regression-test/suites/load_p0/routine_load/test_black_list.groovy

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,5 +150,61 @@ suite("test_black_list","nonConcurrent,p0") {
150150
GetDebugPoint().disableDebugPointForAllBEs(inject)
151151
sql "stop routine load for ${job}"
152152
}
153+
154+
def invalidBrokerTableName = "test_black_list_invalid_broker"
155+
def invalidBrokerJob = "test_black_list_invalid_broker_job"
156+
sql """ DROP TABLE IF EXISTS ${invalidBrokerTableName} """
157+
sql """
158+
CREATE TABLE IF NOT EXISTS ${invalidBrokerTableName} (
159+
`k1` int(20) NULL,
160+
`k2` string NULL,
161+
`v1` date NULL,
162+
`v2` string NULL,
163+
`v3` datetime NULL,
164+
`v4` string NULL
165+
) ENGINE=OLAP
166+
DUPLICATE KEY(`k1`)
167+
COMMENT 'OLAP'
168+
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
169+
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
170+
"""
171+
172+
try {
173+
sql """
174+
CREATE ROUTINE LOAD ${invalidBrokerJob} ON ${invalidBrokerTableName}
175+
COLUMNS TERMINATED BY ","
176+
FROM KAFKA
177+
(
178+
"kafka_broker_list" = "127.0.0.1:1",
179+
"kafka_topic" = "${kafkaCsvTpoics[0]}",
180+
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
181+
);
182+
"""
183+
184+
def count = 0
185+
while (true) {
186+
sleep(1000)
187+
def state = sql "show routine load for ${invalidBrokerJob}"
188+
def stateChangedReason = state[0][17].toString()
189+
def otherMsg = state[0][19].toString()
190+
def errorMsg = "${stateChangedReason} ${otherMsg}"
191+
log.info("invalid broker routine load state: ${state[0][8].toString()}".toString())
192+
log.info("invalid broker reason of state changed: ${stateChangedReason}".toString())
193+
log.info("invalid broker other msg: ${otherMsg}".toString())
194+
if (errorMsg.contains("Failed to get all partitions of kafka topic")) {
195+
assertTrue(errorMsg.contains("failed to get info"))
196+
assertTrue(errorMsg.contains("failed to get partition meta: Local: Broker transport failure"))
197+
break
198+
}
199+
if (count >= 90) {
200+
log.error("routine load invalid broker test fail")
201+
assertEquals(1, 2)
202+
break
203+
}
204+
count++
205+
}
206+
} finally {
207+
try_sql "stop routine load for ${invalidBrokerJob}"
208+
}
153209
}
154210
}

0 commit comments

Comments
 (0)