Skip to content

Commit cc2224b

Browse files
authored
[fix](streaming-job) keep isCanceled set when cancel runs on terminal task (#63427)
### What problem does this PR solve? Streaming insert job (CDC source / JdbcSourceOffsetProvider) can become permanently stuck in PAUSED when a BE-side commit arrives after FE-side task timeout. Symptoms observed in production: - Job status PAUSED with empty ErrorMsg / JobRuntimeMsg. - Latest task PENDING and never scheduled (scheduler logs "do not need to schedule invalid task ... job status: PAUSED"). - Previous task status SUCCESS but its ErrorMsg = "task failed cause timeout". - `auto resume` never recovers the job; only manual `RESUME JOB` works.
1 parent 2eff9ea commit cc2224b

3 files changed

Lines changed: 130 additions & 4 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/AbstractStreamingTask.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,15 +140,15 @@ protected boolean isCallable() {
140140
}
141141

142142
public void cancel(boolean needWaitCancelComplete) {
143+
// Flip isCanceled even on terminal states so late BE callbacks short-circuit.
144+
if (getIsCanceled().getAndSet(true)) {
145+
return;
146+
}
143147
if (TaskStatus.SUCCESS.equals(status) || TaskStatus.FAILED.equals(status)
144148
|| TaskStatus.CANCELED.equals(status)) {
145149
return;
146150
}
147151
status = TaskStatus.CANCELED;
148-
if (getIsCanceled().get()) {
149-
return;
150-
}
151-
getIsCanceled().getAndSet(true);
152152
this.errMsg = "task cancelled";
153153
}
154154

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1437,6 +1437,12 @@ public void commitOffset(CommitOffsetRequest offsetRequest) throws JobException
14371437
try {
14381438
if (this.runningStreamTask != null
14391439
&& this.runningStreamTask instanceof StreamingMultiTblTask) {
1440+
if (this.runningStreamTask.getIsCanceled().get()) {
1441+
log.info("Streaming multi table job {} skip late commit offset on canceled task "
1442+
+ "(expected: {}, actual: {})",
1443+
getJobId(), this.runningStreamTask.getTaskId(), offsetRequest.getTaskId());
1444+
return;
1445+
}
14401446
if (this.runningStreamTask.getTaskId() != offsetRequest.getTaskId()) {
14411447
throw new JobException("Task id mismatch when commit offset. expected: "
14421448
+ this.runningStreamTask.getTaskId() + ", actual: " + offsetRequest.getTaskId());
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.job.extensions.insert.streaming;
19+
20+
import org.apache.doris.common.jmockit.Deencapsulation;
21+
import org.apache.doris.job.cdc.request.CommitOffsetRequest;
22+
import org.apache.doris.job.common.JobStatus;
23+
import org.apache.doris.job.common.TaskStatus;
24+
import org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider;
25+
26+
import org.junit.Assert;
27+
import org.junit.Test;
28+
29+
import java.util.HashMap;
30+
import java.util.concurrent.locks.ReentrantReadWriteLock;
31+
32+
public class StreamingInsertJobLateCallbackTest {
33+
34+
private static StreamingMultiTblTask newTask(long taskId, TaskStatus initialStatus) {
35+
StreamingJobProperties jobProps = new StreamingJobProperties(new HashMap<>());
36+
StreamingMultiTblTask task = new StreamingMultiTblTask(
37+
0L, taskId, null, null, null, null, null, jobProps, null, null);
38+
Deencapsulation.setField(task, "status", initialStatus);
39+
return task;
40+
}
41+
42+
@Test
43+
public void testCancelMarksIsCanceledOnFailedTask() {
44+
StreamingMultiTblTask task = newTask(1001L, TaskStatus.FAILED);
45+
46+
task.cancel(true);
47+
48+
Assert.assertTrue("isCanceled must flip even when task already FAILED",
49+
task.getIsCanceled().get());
50+
Assert.assertEquals("status preserved when already terminal",
51+
TaskStatus.FAILED, task.getStatus());
52+
}
53+
54+
@Test
55+
public void testCancelMarksIsCanceledOnSuccessTask() {
56+
StreamingMultiTblTask task = newTask(1002L, TaskStatus.SUCCESS);
57+
58+
task.cancel(true);
59+
60+
Assert.assertTrue(task.getIsCanceled().get());
61+
Assert.assertEquals(TaskStatus.SUCCESS, task.getStatus());
62+
}
63+
64+
@Test
65+
public void testCancelTransitionsRunningToCanceled() {
66+
StreamingMultiTblTask task = newTask(1003L, TaskStatus.RUNNING);
67+
68+
task.cancel(true);
69+
70+
Assert.assertTrue(task.getIsCanceled().get());
71+
Assert.assertEquals(TaskStatus.CANCELED, task.getStatus());
72+
}
73+
74+
@Test
75+
public void testCancelIdempotent() {
76+
StreamingMultiTblTask task = newTask(1004L, TaskStatus.RUNNING);
77+
78+
task.cancel(true);
79+
Assert.assertEquals(TaskStatus.CANCELED, task.getStatus());
80+
Assert.assertTrue(task.getIsCanceled().get());
81+
82+
Deencapsulation.setField(task, "errMsg", "first cancel");
83+
task.cancel(true);
84+
Assert.assertEquals("second cancel must early-return and leave state untouched",
85+
"first cancel", Deencapsulation.getField(task, "errMsg"));
86+
}
87+
88+
@Test
89+
public void testCommitOffsetSkipsCanceledTask() throws Exception {
90+
StreamingInsertJob job = Deencapsulation.newInstance(StreamingInsertJob.class);
91+
Deencapsulation.setField(job, "lock", new ReentrantReadWriteLock(true));
92+
Deencapsulation.setField(job, "jobId", 9001L);
93+
Deencapsulation.setField(job, "jobName", "test_job");
94+
Deencapsulation.setField(job, "jobStatus", JobStatus.PAUSED);
95+
96+
JdbcSourceOffsetProvider provider = Deencapsulation.newInstance(JdbcSourceOffsetProvider.class);
97+
Deencapsulation.setField(job, "offsetProvider", provider);
98+
99+
StreamingMultiTblTask task = newTask(7777L, TaskStatus.FAILED);
100+
// simulate the bug timeline: task already FAILED via onFail, then cancel marks isCanceled.
101+
task.cancel(true);
102+
Assert.assertTrue(task.getIsCanceled().get());
103+
104+
Deencapsulation.setField(job, "runningStreamTask", task);
105+
106+
CommitOffsetRequest req = new CommitOffsetRequest();
107+
req.setJobId(9001L);
108+
req.setTaskId(7777L);
109+
req.setOffset("[{\"splitId\":\"binlog-split\"}]");
110+
req.setScannedRows(123L);
111+
req.setLoadBytes(456L);
112+
113+
// Should silently skip — no JobException, no Task.status flip back to SUCCESS,
114+
// no successCallback side-effects.
115+
job.commitOffset(req);
116+
117+
Assert.assertEquals("task status must stay terminal — late callback ignored",
118+
TaskStatus.FAILED, task.getStatus());
119+
}
120+
}

0 commit comments

Comments
 (0)