Skip to content

Commit ece0572

Browse files
branch-4.1: [fix](streaming-job) fix streaming job properties not parsed after FE restart #62298 (#62432)
Cherry-picked from #62298 Co-authored-by: wudi <wudi@selectdb.com>
1 parent 48bfd04 commit ece0572

File tree

4 files changed

+245
-5
lines changed

4 files changed

+245
-5
lines changed

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.doris.qe.SessionVariable;
2525
import org.apache.doris.qe.VariableMgr;
2626

27+
import com.google.common.base.Strings;
2728
import lombok.Data;
2829
import org.json.simple.JSONObject;
2930

@@ -60,10 +61,23 @@ public class StreamingJobProperties implements JobProperties {
6061

6162
public StreamingJobProperties(Map<String, String> jobProperties) {
6263
this.properties = jobProperties;
63-
if (properties.isEmpty()) {
64-
this.maxIntervalSecond = DEFAULT_MAX_INTERVAL_SECOND;
65-
this.s3BatchFiles = DEFAULT_MAX_S3_BATCH_FILES;
66-
this.s3BatchBytes = DEFAULT_MAX_S3_BATCH_BYTES;
64+
this.maxIntervalSecond = parseLongOrDefault(
65+
properties.get(MAX_INTERVAL_SECOND_PROPERTY), DEFAULT_MAX_INTERVAL_SECOND);
66+
this.s3BatchFiles = parseLongOrDefault(
67+
properties.get(S3_MAX_BATCH_FILES_PROPERTY), DEFAULT_MAX_S3_BATCH_FILES);
68+
this.s3BatchBytes = parseLongOrDefault(
69+
properties.get(S3_MAX_BATCH_BYTES_PROPERTY), DEFAULT_MAX_S3_BATCH_BYTES);
70+
}
71+
72+
private static long parseLongOrDefault(String valStr, long defaultVal) {
73+
if (Strings.isNullOrEmpty(valStr)) {
74+
return defaultVal;
75+
}
76+
try {
77+
long val = Long.parseLong(valStr);
78+
return val >= 1 ? val : defaultVal;
79+
} catch (NumberFormatException e) {
80+
return defaultVal;
6781
}
6882
}
6983

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,12 @@ public boolean isTimeout() {
317317
// It's still pending, waiting for scheduling.
318318
return false;
319319
}
320-
return (System.currentTimeMillis() - startTimeMs) > timeoutMs;
320+
long elapsed = System.currentTimeMillis() - startTimeMs;
321+
if (elapsed > timeoutMs) {
322+
log.info("Task {} timeout detected: elapsed={}ms, timeoutMs={}ms", taskId, elapsed, timeoutMs);
323+
return true;
324+
}
325+
return false;
321326
}
322327

323328
/**

fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobPropertiesTest.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.doris.job.extensions.insert.streaming;
1919

20+
import org.apache.doris.common.AnalysisException;
2021
import org.apache.doris.job.exception.JobException;
2122
import org.apache.doris.job.extensions.insert.InsertTask;
2223
import org.apache.doris.qe.ConnectContext;
@@ -29,6 +30,83 @@
2930

3031
public class StreamingJobPropertiesTest {
3132

33+
/**
34+
* Simulate FE restart: constructor is called without validate().
35+
* Before the fix, maxIntervalSecond would be 0 when properties is non-empty,
36+
* causing streaming tasks to timeout immediately after FE restart.
37+
*/
38+
@Test
39+
public void testConstructorParsesPropertiesWithoutValidate() {
40+
// Case 1: empty properties -> should use defaults
41+
StreamingJobProperties emptyProps = new StreamingJobProperties(new HashMap<>());
42+
Assert.assertEquals(StreamingJobProperties.DEFAULT_MAX_INTERVAL_SECOND,
43+
emptyProps.getMaxIntervalSecond());
44+
Assert.assertEquals(StreamingJobProperties.DEFAULT_MAX_S3_BATCH_FILES,
45+
emptyProps.getS3BatchFiles());
46+
Assert.assertEquals(StreamingJobProperties.DEFAULT_MAX_S3_BATCH_BYTES,
47+
emptyProps.getS3BatchBytes());
48+
49+
// Case 2: explicit max_interval=1 (the bug scenario)
50+
// Before fix: maxIntervalSecond would be 0 because isEmpty()=false skipped defaults
51+
HashMap<String, String> props = new HashMap<>();
52+
props.put("max_interval", "1");
53+
StreamingJobProperties customProps = new StreamingJobProperties(props);
54+
Assert.assertEquals(1L, customProps.getMaxIntervalSecond());
55+
56+
// Case 3: explicit max_interval=5
57+
HashMap<String, String> props2 = new HashMap<>();
58+
props2.put("max_interval", "5");
59+
StreamingJobProperties customProps2 = new StreamingJobProperties(props2);
60+
Assert.assertEquals(5L, customProps2.getMaxIntervalSecond());
61+
// s3 properties not set -> should use defaults
62+
Assert.assertEquals(StreamingJobProperties.DEFAULT_MAX_S3_BATCH_FILES,
63+
customProps2.getS3BatchFiles());
64+
}
65+
66+
/**
67+
* Constructor should be resilient to bad data (e.g. corrupted metadata),
68+
* falling back to defaults instead of throwing exceptions.
69+
*/
70+
@Test
71+
public void testConstructorHandlesBadValues() {
72+
// non-numeric value -> fallback to default
73+
HashMap<String, String> props = new HashMap<>();
74+
props.put("max_interval", "abc");
75+
StreamingJobProperties p = new StreamingJobProperties(props);
76+
Assert.assertEquals(StreamingJobProperties.DEFAULT_MAX_INTERVAL_SECOND,
77+
p.getMaxIntervalSecond());
78+
79+
// zero value -> fallback to default (must be >= 1)
80+
HashMap<String, String> props2 = new HashMap<>();
81+
props2.put("max_interval", "0");
82+
StreamingJobProperties p2 = new StreamingJobProperties(props2);
83+
Assert.assertEquals(StreamingJobProperties.DEFAULT_MAX_INTERVAL_SECOND,
84+
p2.getMaxIntervalSecond());
85+
86+
// negative value -> fallback to default
87+
HashMap<String, String> props3 = new HashMap<>();
88+
props3.put("max_interval", "-1");
89+
StreamingJobProperties p3 = new StreamingJobProperties(props3);
90+
Assert.assertEquals(StreamingJobProperties.DEFAULT_MAX_INTERVAL_SECOND,
91+
p3.getMaxIntervalSecond());
92+
}
93+
94+
/**
95+
* validate() should still reject bad values with AnalysisException,
96+
* keeping the strict check for job creation.
97+
*/
98+
@Test
99+
public void testValidateStillRejectsBadValues() {
100+
HashMap<String, String> props = new HashMap<>();
101+
props.put("max_interval", "0");
102+
StreamingJobProperties p = new StreamingJobProperties(props);
103+
// constructor fallback is fine
104+
Assert.assertEquals(StreamingJobProperties.DEFAULT_MAX_INTERVAL_SECOND,
105+
p.getMaxIntervalSecond());
106+
// but validate() should throw
107+
Assert.assertThrows(AnalysisException.class, p::validate);
108+
}
109+
32110
@Test
33111
public void testSessionVariables() throws JobException {
34112
//default
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.doris.regression.suite.ClusterOptions
19+
import org.awaitility.Awaitility
20+
21+
import static java.util.concurrent.TimeUnit.SECONDS
22+
23+
suite("test_streaming_mysql_job_restart_fe_with_props", "docker,mysql,external_docker,external_docker_mysql,nondatalake") {
24+
def jobName = "test_streaming_mysql_job_restart_fe_with_props"
25+
def options = new ClusterOptions()
26+
options.setFeNum(1)
27+
options.cloudMode = null
28+
29+
docker(options) {
30+
def currentDb = (sql "select database()")[0][0]
31+
def table1 = "restart_props_user_info"
32+
def mysqlDb = "test_cdc_db"
33+
34+
sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
35+
sql """drop table if exists ${currentDb}.${table1} force"""
36+
37+
String enabled = context.config.otherConfigs.get("enableJdbcTest")
38+
if (enabled != null && enabled.equalsIgnoreCase("true")) {
39+
String mysql_port = context.config.otherConfigs.get("mysql_57_port");
40+
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
41+
String s3_endpoint = getS3Endpoint()
42+
String bucket = getS3BucketName()
43+
String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
44+
45+
connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
46+
sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
47+
sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
48+
sql """CREATE TABLE ${mysqlDb}.${table1} (
49+
`name` varchar(200) NOT NULL,
50+
`age` int DEFAULT NULL,
51+
PRIMARY KEY (`name`)
52+
) ENGINE=InnoDB"""
53+
sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1', 1);"""
54+
sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1', 2);"""
55+
}
56+
57+
// Create job with explicit max_interval property.
58+
// Before the fix, after FE restart the max_interval would not be parsed
59+
// from properties in the constructor, causing timeoutMs=0 and every task
60+
// to timeout immediately.
61+
sql """CREATE JOB ${jobName}
62+
PROPERTIES("max_interval" = "5")
63+
ON STREAMING
64+
FROM MYSQL (
65+
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
66+
"driver_url" = "${driver_url}",
67+
"driver_class" = "com.mysql.cj.jdbc.Driver",
68+
"user" = "root",
69+
"password" = "123456",
70+
"database" = "${mysqlDb}",
71+
"include_tables" = "${table1}",
72+
"offset" = "initial"
73+
)
74+
TO DATABASE ${currentDb} (
75+
"table.create.properties.replication_num" = "1"
76+
)
77+
"""
78+
79+
// Wait for snapshot data to be loaded
80+
try {
81+
Awaitility.await().atMost(300, SECONDS)
82+
.pollInterval(1, SECONDS).until(
83+
{
84+
def jobSucceedCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
85+
log.info("jobSucceedCount: " + jobSucceedCount)
86+
jobSucceedCount.size() == 1 && '2' <= jobSucceedCount.get(0).get(0)
87+
}
88+
)
89+
} catch (Exception ex) {
90+
def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
91+
def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
92+
log.info("show job: " + showjob)
93+
log.info("show task: " + showtask)
94+
throw ex;
95+
}
96+
97+
def jobInfoBeforeRestart = sql """
98+
select loadStatistic, status, currentOffset from jobs("type"="insert") where Name='${jobName}'
99+
"""
100+
log.info("jobInfoBeforeRestart: " + jobInfoBeforeRestart)
101+
def loadStatBefore = parseJson(jobInfoBeforeRestart.get(0).get(0))
102+
assert loadStatBefore.scannedRows == 2
103+
assert jobInfoBeforeRestart.get(0).get(1) == "RUNNING"
104+
105+
// Restart FE
106+
cluster.restartFrontends()
107+
sleep(60000)
108+
context.reconnectFe()
109+
110+
// Insert new data after restart
111+
connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {
112+
sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('C1', 3);"""
113+
}
114+
115+
// Verify new data gets consumed after restart.
116+
// Before the fix, timeoutMs=0 caused every task to timeout immediately,
117+
// so the job would be stuck in PAUSED and never consume new data.
118+
try {
119+
Awaitility.await().atMost(120, SECONDS)
120+
.pollInterval(2, SECONDS).until(
121+
{
122+
def loadStat = sql """ select loadStatistic from jobs("type"="insert") where Name = '${jobName}' """
123+
def stat = parseJson(loadStat.get(0).get(0))
124+
log.info("scannedRows after restart: " + stat.scannedRows)
125+
stat.scannedRows == 3
126+
}
127+
)
128+
} catch (Exception ex) {
129+
def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
130+
def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
131+
log.info("show job after restart insert: " + showjob)
132+
log.info("show task after restart insert: " + showtask)
133+
throw ex;
134+
}
135+
136+
def result = sql """select * from ${currentDb}.${table1} order by name"""
137+
log.info("final result: " + result)
138+
assert result.size() == 3
139+
140+
sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
141+
}
142+
}
143+
}

0 commit comments

Comments
 (0)