Skip to content

Commit 6e27f11

Browse files
authored
[improve](streaming-job) avoid potential OOM when reading large snapshot splits (#63833)
## Summary - Default-skip flink-cdc's in-snapshot backfill on the from-to path so large splits no longer accumulate the entire chunk + backfill stream in the fetcher's outputBuffer; from-to is at-least-once and tolerates the duplicates this introduces. TVF (job-driven and standalone) keeps the standard `false` default for exactly-once via per-task offset commit. - Expose `skip_snapshot_backfill` as a user-facing property with strict `true`/`false` validation on both from-to (CREATE JOB) and TVF (SELECT FROM cdc_stream(...)) entry points. - Fix snapshot completion under `pollWithoutBuffer`: a split is now marked complete only after its high-watermark event has been consumed (`splitState.getHighWatermark() != null`), not on the first non-empty fetcher batch. Without this, enabling the new default truncates any split larger than debezium's `max.batch.size` and yields an NPE on offset extraction. - Read `streaming_task_timeout_multiplier` live in `StreamingMultiTblTask.isTimeout()` so `admin set frontend config` affects already-running tasks, matching the `@ConfField(mutable=true)` contract.
1 parent aa68e4b commit 6e27f11

13 files changed

Lines changed: 418 additions & 9 deletions

File tree

fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class DataSourceConfigKeys {
3838
public static final String SNAPSHOT_SPLIT_KEY = "snapshot_split_key";
3939
public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
4040
public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
41+
public static final String SKIP_SNAPSHOT_BACKFILL = "skip_snapshot_backfill";
4142
// MySQL CDC client identity. Single value "5400" or range "5400-5408".
4243
public static final String SERVER_ID = "server_id";
4344
public static final String SSL_MODE = "ssl_mode";

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class DataSourceConfigValidator {
5050
DataSourceConfigKeys.EXCLUDE_TABLES,
5151
DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE,
5252
DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
53+
DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL,
5354
DataSourceConfigKeys.SSL_MODE,
5455
DataSourceConfigKeys.SSL_ROOTCERT,
5556
DataSourceConfigKeys.SLOT_NAME,
@@ -208,12 +209,21 @@ private static boolean isValidValue(String key, String value, String dataSourceT
208209
|| key.equals(DataSourceConfigKeys.SNAPSHOT_PARALLELISM)) {
209210
return isPositiveInt(value);
210211
}
212+
if (key.equals(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL)) {
213+
return isValidBoolean(value);
214+
}
211215
if (key.equals(DataSourceConfigKeys.SERVER_ID)) {
212216
return parseServerIdRange(value) != null;
213217
}
214218
return true;
215219
}
216220

221+
// Strict boolean: only "true"/"false" (case-insensitive); Boolean.parseBoolean would
222+
// silently coerce typos like "yes" to false.
223+
public static boolean isValidBoolean(String value) {
224+
return "true".equalsIgnoreCase(value) || "false".equalsIgnoreCase(value);
225+
}
226+
217227
public static boolean isPositiveInt(String value) {
218228
if (value == null) {
219229
return false;

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,8 @@ private void checkRequiredSourceProperties() {
276276
if (!sourceProperties.containsKey(DataSourceConfigKeys.OFFSET)) {
277277
sourceProperties.put(DataSourceConfigKeys.OFFSET, DataSourceConfigKeys.OFFSET_LATEST);
278278
}
279+
// from-to is at-least-once; default-skip in-snapshot backfill.
280+
sourceProperties.putIfAbsent(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL, "true");
279281
}
280282

281283
private List<String> createTableIfNotExists() throws Exception {

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ public class StreamingMultiTblTask extends AbstractStreamingTask {
8282
private long loadBytes = 0L;
8383
private long filteredRows = 0L;
8484
private long loadedRows = 0L;
85-
private long timeoutMs;
8685
private long runningBackendId;
8786

8887
public StreamingMultiTblTask(Long jobId,
@@ -103,7 +102,6 @@ public StreamingMultiTblTask(Long jobId,
103102
this.jobProperties = jobProperties;
104103
this.targetDb = targetDb;
105104
this.cloudCluster = cloudCluster;
106-
this.timeoutMs = Config.streaming_task_timeout_multiplier * jobProperties.getMaxIntervalSecond() * 1000L;
107105
}
108106

109107
@Override
@@ -327,6 +325,9 @@ public boolean isTimeout() {
327325
// It's still pending, waiting for scheduling.
328326
return false;
329327
}
328+
// Read multiplier live so config changes affect already-running tasks.
329+
long timeoutMs = Config.streaming_task_timeout_multiplier
330+
* jobProperties.getMaxIntervalSecond() * 1000L;
330331
long elapsed = System.currentTimeMillis() - startTimeMs;
331332
if (elapsed > timeoutMs) {
332333
log.info("Task {} timeout detected: elapsed={}ms, timeoutMs={}ms", taskId, elapsed, timeoutMs);

fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ private void validate(Map<String, String> properties) throws AnalysisException {
156156
}
157157
validatePositiveIntIfPresent(properties, DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE);
158158
validatePositiveIntIfPresent(properties, DataSourceConfigKeys.SNAPSHOT_PARALLELISM);
159+
validateBooleanIfPresent(properties, DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL);
159160
// TVF entrypoint shares server_id checks with the from-to path's validateSource.
160161
try {
161162
DataSourceConfigValidator.validateServerIdConfig(properties);
@@ -186,6 +187,17 @@ private static void validatePgIdentifierIfPresent(Map<String, String> properties
186187
}
187188
}
188189

190+
private static void validateBooleanIfPresent(Map<String, String> properties, String key)
191+
throws AnalysisException {
192+
String value = properties.get(key);
193+
if (value == null) {
194+
return;
195+
}
196+
if (!DataSourceConfigValidator.isValidBoolean(value)) {
197+
throw new AnalysisException("Invalid value for key '" + key + "': " + value);
198+
}
199+
}
200+
189201
private void generateFileStatus() {
190202
this.fileStatuses.clear();
191203
this.fileStatuses.add(new TBrokerFileStatus(URI, false, Integer.MAX_VALUE, false));

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ private void buildStreamRecords(
168168
long elapsedTime = System.currentTimeMillis() - startTime;
169169
boolean timeoutReached = elapsedTime > Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
170170
if (shouldStop(
171+
sourceReader,
171172
isSnapshotSplit,
172173
hasReceivedData,
173174
lastMessageIsHeartbeat,
@@ -315,6 +316,7 @@ private RecordWithMeta buildRecordResponse(
315316
boolean timeoutReached = elapsedTime > Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
316317

317318
if (shouldStop(
319+
sourceReader,
318320
isSnapshotSplit,
319321
hasReceivedData,
320322
lastMessageIsHeartbeat,
@@ -484,6 +486,7 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
484486
&& elapsedTime >= maxIntervalMillis;
485487

486488
if (shouldStop(
489+
sourceReader,
487490
isSnapshotSplit,
488491
scannedRows > 0,
489492
lastMessageIsHeartbeat,
@@ -615,18 +618,21 @@ public static boolean isHeartbeatEvent(SourceRecord record) {
615618
* @return true if should stop, false if should continue
616619
*/
617620
private boolean shouldStop(
621+
SourceReader sourceReader,
618622
boolean isSnapshotSplit,
619623
boolean hasData,
620624
boolean lastMessageIsHeartbeat,
621625
long elapsedTime,
622626
long maxIntervalMillis,
623627
boolean timeoutReached) {
624628

625-
// 1. Snapshot split with data: if no more data in queue, stop immediately (no need to wait
626-
// for timeout)
627-
// snapshot split will be written to the debezium queue all at once.
628-
// multiple snapshot splits are handled in the source reader.
629+
// Snapshot split: wait until every split has received its high-watermark event;
630+
// an empty poll alone is not a finish signal under pollWithoutBuffer where the
631+
// fetcher returns one ChangeEventQueue batch at a time.
629632
if (isSnapshotSplit) {
633+
if (!sourceReader.isSnapshotFinished()) {
634+
return false;
635+
}
630636
LOG.info(
631637
"Snapshot split finished, no more data available. Total elapsed: {} ms",
632638
elapsedTime);

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
417417
OBJECT_MAPPER.readValue(loadResult, RespContent.class);
418418
if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
419419
long cacheByteBeforeFlush =
420-
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
420+
currentCacheBytes.getAndAdd(-buffer.getBufferSizeBytes());
421421
LOG.info(
422422
"load success, cacheBeforeFlushBytes: {}, currentCacheBytes : {}",
423423
cacheByteBeforeFlush,

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,9 @@ private Iterator<SourceRecord> pollRecordsFromSnapshotReaders() throws Exception
468468
return Collections.emptyIterator();
469469
}
470470

471+
// A split is finished only after its high-watermark event has been consumed.
472+
refreshCompletedSplits();
473+
471474
if (completedSplitIds.size() >= snapshotReaderContexts.size()) {
472475
LOG.info("All {} snapshot splits have been completed", snapshotReaderContexts.size());
473476
return Collections.emptyIterator();
@@ -515,6 +518,11 @@ private void startParallelPolling() {
515518
Fetcher<SourceRecords, SourceSplitBase>,
516519
SnapshotSplitState>
517520
context = snapshotReaderContexts.get(index);
521+
// Skip splits already drained to high-watermark; otherwise their poll futures spin
522+
// returning null and starve siblings.
523+
if (completedSplitIds.contains(context.getSplit().splitId())) {
524+
continue;
525+
}
518526

519527
CompletableFuture<PollResult> future =
520528
CompletableFuture.supplyAsync(
@@ -569,11 +577,12 @@ private PollResult waitForAnyCompletion() throws Exception {
569577
snapshot.remove(future);
570578
PollResult result = future.get();
571579
if (result != null) {
580+
// Split completion is determined later by splitState.getHighWatermark()
581+
// != null, not by receiving a non-empty batch.
572582
LOG.info(
573583
"Got result from reader {}, {} futures remaining",
574584
result.context.getSplit().splitId(),
575585
snapshot.size());
576-
completedSplitIds.add(result.context.getSplit().splitId());
577586
return result;
578587
}
579588
// If result is null (no data), continue checking other futures
@@ -839,6 +848,35 @@ public Map<String, String> extractSnapshotStateOffset(Object splitState) {
839848
return offsetRes;
840849
}
841850

851+
@Override
852+
public boolean isSnapshotFinished() {
853+
if (snapshotReaderContexts.isEmpty()) {
854+
return true;
855+
}
856+
for (SnapshotReaderContext<
857+
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit,
858+
Fetcher<SourceRecords, SourceSplitBase>,
859+
SnapshotSplitState>
860+
context : snapshotReaderContexts) {
861+
if (context.getSplitState().getHighWatermark() == null) {
862+
return false;
863+
}
864+
}
865+
return true;
866+
}
867+
868+
private void refreshCompletedSplits() {
869+
for (SnapshotReaderContext<
870+
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit,
871+
Fetcher<SourceRecords, SourceSplitBase>,
872+
SnapshotSplitState>
873+
context : snapshotReaderContexts) {
874+
if (context.getSplitState().getHighWatermark() != null) {
875+
completedSplitIds.add(context.getSplit().splitId());
876+
}
877+
}
878+
}
879+
842880
@Override
843881
public Map<String, String> extractBinlogStateOffset(Object splitState) {
844882
Preconditions.checkNotNull(splitState, "splitState is null");

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,9 @@ default String serializeTableSchemas() {
9898
* indicate how far the source TX log can be discarded.
9999
*/
100100
default void commitSourceOffset(String jobId, SourceSplit sourceSplit) {}
101+
102+
/** Whether all snapshot splits have received their high-watermark event. */
103+
default boolean isSnapshotFinished() {
104+
return true;
105+
}
101106
}

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,9 @@ private Iterator<SourceRecord> pollRecordsFromSnapshotReaders() throws Exception
489489
return Collections.emptyIterator();
490490
}
491491

492+
// A split is finished only after its high-watermark event has been consumed.
493+
refreshCompletedSplits();
494+
492495
if (completedSplitIds.size() >= snapshotReaderContexts.size()) {
493496
LOG.info("All {} snapshot splits have been completed", snapshotReaderContexts.size());
494497
return Collections.emptyIterator();
@@ -533,6 +536,11 @@ private void startParallelPolling() {
533536
final int index = i;
534537
SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader, MySqlSnapshotSplitState>
535538
context = snapshotReaderContexts.get(index);
539+
// Skip splits already drained to high-watermark; otherwise their poll futures spin
540+
// returning null and starve siblings.
541+
if (completedSplitIds.contains(context.getSplit().splitId())) {
542+
continue;
543+
}
536544

537545
CompletableFuture<PollResult> future =
538546
CompletableFuture.supplyAsync(
@@ -587,11 +595,12 @@ private PollResult waitForAnyCompletion() throws Exception {
587595
snapshot.remove(future);
588596
PollResult result = future.get();
589597
if (result != null) {
598+
// Split completion is determined later by splitState.getHighWatermark()
599+
// != null, not by receiving a non-empty batch.
590600
LOG.info(
591601
"Got result from reader {}, {} futures remaining",
592602
result.context.getSplit().splitId(),
593603
snapshot.size());
594-
completedSplitIds.add(result.context.getSplit().splitId());
595604
return result;
596605
}
597606
// If result is null (no data), continue checking other futures
@@ -992,6 +1001,10 @@ private MySqlSourceConfig generateMySqlConfig(
9921001
objectPath, cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY));
9931002
}
9941003

1004+
// FE injects "true" on TVF path; from-to leaves it absent → default false.
1005+
configFactory.skipSnapshotBackfill(
1006+
Boolean.parseBoolean(cdcConfig.get(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL)));
1007+
9951008
return configFactory.createConfig(subtaskId);
9961009
}
9971010

@@ -1013,6 +1026,29 @@ public Map<String, String> extractSnapshotStateOffset(Object splitState) {
10131026
return new HashMap<>(highWatermark.getOffset());
10141027
}
10151028

1029+
@Override
1030+
public boolean isSnapshotFinished() {
1031+
if (snapshotReaderContexts.isEmpty()) {
1032+
return true;
1033+
}
1034+
for (SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader, MySqlSnapshotSplitState>
1035+
context : snapshotReaderContexts) {
1036+
if (context.getSplitState().getHighWatermark() == null) {
1037+
return false;
1038+
}
1039+
}
1040+
return true;
1041+
}
1042+
1043+
private void refreshCompletedSplits() {
1044+
for (SnapshotReaderContext<MySqlSnapshotSplit, SnapshotSplitReader, MySqlSnapshotSplitState>
1045+
context : snapshotReaderContexts) {
1046+
if (context.getSplitState().getHighWatermark() != null) {
1047+
completedSplitIds.add(context.getSplit().splitId());
1048+
}
1049+
}
1050+
}
1051+
10161052
@Override
10171053
public Map<String, String> extractBinlogStateOffset(Object splitState) {
10181054
Preconditions.checkNotNull(splitState, "splitState is null");

0 commit comments

Comments
 (0)