Skip to content

Commit 9ecfd40

Browse files
authored
[fix](job) fix StreamingJob loadStatistic reset to zero after FE checkpoint restart (#61997)
### What problem does this PR solve? Problem Summary: 1. When FE restarts from a checkpoint image, `replayOnCommitted()` is not called for transactions committed before the checkpoint. Since `jobStatistic` in `StreamingInsertJob` lacked `@SerializedName`, it was not written into the image, causing `scannedRows` and `loadBytes` to reset to zero after every checkpoint-based restart. 2. add restart case for cdc tvf #### Root Cause `jobStatistic` accumulates statistics via `+=` in `replayOnCommitted()`. Unlike `offset` (which is an assignment and idempotent), accumulated values cannot be recovered from post-checkpoint txn replay alone. The field must be persisted in the image to survive checkpoint restarts.
1 parent f5783c0 commit 9ecfd40

7 files changed

Lines changed: 313 additions & 25 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M
108108
public static final String JOB_FILE_CATALOG = "streaming_job";
109109
private long dbId;
110110
// Streaming job statistics, all persisted in txn attachment
111+
// when checkpoint image replay need Serialized
112+
@SerializedName("jstc")
111113
private StreamingJobStatistic jobStatistic = new StreamingJobStatistic();
112114
// Non-txn persisted statistics, used for streaming multi task
113115
@Getter

regression-test/data/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ B1 2
66
-- !final_data --
77
A1 1
88
B1 2
9-
C1 30
9+
C1 3
1010
D1 4
1111
E1 5
1212
F1 6
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !final_data --
3+
A1 1
4+
B1 2
5+
C1 3
6+
D1 4
7+
E1 5
8+
F1 6
9+
G1 7
10+
H1 8
11+
I1 9
12+

regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ suite("test_streaming_job_cdc_stream_postgres_latest_alter_cred",
5858

5959
sql """
6060
CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
61-
`name` varchar(200) NOT NULL,
61+
`name` varchar(200) NULL,
6262
`age` int NULL
6363
) ENGINE=OLAP
64-
UNIQUE KEY(`name`)
64+
DUPLICATE KEY(`name`)
6565
DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
6666
PROPERTIES ("replication_allocation" = "tag.location.default: 1")
6767
"""

regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_pause_resume.groovy

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import static java.util.concurrent.TimeUnit.SECONDS
2727
* 1. Snapshot phase: pre-existing rows (A1, B1) are synced via full snapshot.
2828
* 2. Binlog phase begins: INSERT (C1, D1) are applied; job enters steady binlog state.
2929
* 3. Job is paused; verify status stays PAUSED and currentOffset / endOffset are non-empty.
30-
* 4. While paused: INSERT (E1, F1) and UPDATE C1.age → 30 into the PG source table.
30+
* 4. While paused: INSERT (E1, F1) into the PG source table.
3131
* 5. Job is resumed; verify status transitions to RUNNING.
3232
* 6. Verify all rows inserted before and during pause eventually appear in Doris (no data loss).
3333
* 7. Verify FailedTaskCount == 0 and no error message after resume.
@@ -45,13 +45,12 @@ suite("test_streaming_job_cdc_stream_postgres_pause_resume", "p0,external,pg,ext
4545
sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
4646
sql """drop table if exists ${currentDb}.${dorisTable} force"""
4747

48-
// Use UNIQUE key so that UPDATE rows overwrite in Doris
4948
sql """
5049
CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
51-
`name` varchar(200) NOT NULL,
50+
`name` varchar(200) NULL,
5251
`age` int NULL
5352
) ENGINE=OLAP
54-
UNIQUE KEY(`name`)
53+
DUPLICATE KEY(`name`)
5554
DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
5655
PROPERTIES ("replication_allocation" = "tag.location.default: 1")
5756
"""
@@ -112,6 +111,8 @@ suite("test_streaming_job_cdc_stream_postgres_pause_resume", "p0,external,pg,ext
112111
connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
113112
sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) VALUES ('C1', 3)"""
114113
sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) VALUES ('D1', 4)"""
114+
def xminResult = sql """SELECT xmin, xmax , * FROM ${pgDB}.${pgSchema}.${pgTable} WHERE name = 'D1'; """
115+
log.info("xminResult: " + xminResult)
115116
}
116117

117118
try {
@@ -169,11 +170,10 @@ suite("test_streaming_job_cdc_stream_postgres_pause_resume", "p0,external,pg,ext
169170

170171
// ── Phase 6: DML while job is paused ─────────────────────────────────────────
171172
connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
172-
// new rows added while paused
173173
sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) VALUES ('E1', 5)"""
174174
sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) VALUES ('F1', 6)"""
175-
// update an existing row while paused; after resume Doris (UNIQUE) should reflect new value
176-
sql """UPDATE ${pgDB}.${pgSchema}.${pgTable} SET age = 30 WHERE name = 'C1'"""
175+
def xminResult = sql """SELECT xmin, xmax , * FROM ${pgDB}.${pgSchema}.${pgTable} WHERE name = 'F1'; """
176+
log.info("xminResult: " + xminResult)
177177
}
178178

179179
// ── Phase 7: resume the job ────────────────────────────────────────────────────
@@ -204,19 +204,6 @@ suite("test_streaming_job_cdc_stream_postgres_pause_resume", "p0,external,pg,ext
204204
throw ex
205205
}
206206

207-
// wait for UPDATE on C1 to be applied (age=30 in Doris)
208-
try {
209-
Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({
210-
def rows = sql """SELECT age FROM ${currentDb}.${dorisTable} WHERE name = 'C1'"""
211-
log.info("C1 age after resume: " + rows)
212-
rows.size() == 1 && (rows.get(0).get(0) as int) == 30
213-
})
214-
} catch (Exception ex) {
215-
log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'"""))
216-
log.info("tasks: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'"""))
217-
throw ex
218-
}
219-
220207
// ── Phase 9: assert final correctness ─────────────────────────────────────────
221208
qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY name """
222209

0 commit comments

Comments
 (0)