diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index d8271994449657..e99db552c6b686 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -403,6 +403,7 @@ public void createScanRangeLocations() throws UserException { if (inputSplits.isEmpty() && !isFileStreamType()) { return; } + Set distinctFilePaths = new HashSet<>(); Multimap assignment = backendPolicy.computeScanRangeAssignment(inputSplits); for (Backend backend : assignment.keySet()) { Collection splits = assignment.get(backend); @@ -410,9 +411,13 @@ public void createScanRangeLocations() throws UserException { scanRangeLocations.add(splitToScanRange(backend, locationProperties, split, pathPartitionKeys, admissionResult)); totalFileSize += split.getLength(); + if (split instanceof FileSplit) { + distinctFilePaths.add(((FileSplit) split).getPathString()); + } } scanBackendIds.add(backend.getId()); } + selectedFileNum = distinctFilePaths.size(); } getSerializedTable().ifPresent(params::setSerializedTable); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 82504fc6db3439..e36ebefe646eb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -69,6 +69,8 @@ public abstract class FileScanNode extends ExternalScanNode { // For explain protected long totalFileSize = 0; + // -1 means unknown (batch-mode scans don't materialize splits on FE). + protected int selectedFileNum = -1; protected long totalPartitionNum = 0; // For display pushdown agg result protected long tableLevelRowCount = -1; @@ -113,6 +115,10 @@ public long getTotalFileSize() { return totalFileSize; } + public int getSelectedFileNum() { + return selectedFileNum; + } + /** * Get all delete files for the given file range. * @param rangeDesc the file range descriptor diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index f12bc8cb11f7cc..7046f0022552ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -610,8 +610,12 @@ private void applyInsertPlanStatistic(FastInsertIntoValuesPlanner planner) { for (PlanFragment fragment : planner.getFragments()) { if (fragment.getPlanRoot() instanceof FileScanNode) { FileScanNode fileScanNode = (FileScanNode) fragment.getPlanRoot(); + // Prefer distinct file count; fall back to split count for batch-mode scans. + int fileNum = fileScanNode.getSelectedFileNum() >= 0 + ? fileScanNode.getSelectedFileNum() + : (int) fileScanNode.getSelectedSplitNum(); Env.getCurrentEnv().getLoadManager().getLoadJob(getJobId()) - .addLoadFileInfo((int) fileScanNode.getSelectedSplitNum(), fileScanNode.getTotalFileSize()); + .addLoadFileInfo(fileNum, fileScanNode.getTotalFileSize()); } } } diff --git a/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_file_number.out b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_file_number.out new file mode 100644 index 00000000000000..bf3c8f2571c273 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_file_number.out @@ -0,0 +1,23 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +1 Emily 25 +2 Benjamin 35 +3 Olivia 28 +4 Alexander 60 +5 Ava 17 +6 William 69 +7 Sophia 32 +8 James 64 +9 Emma 37 +10 Liam 64 +11 Alexander 34 +12 Isabella 43 +13 Benjamin 56 +14 Sophia 12 +15 Christopher 33 +16 Emma 23 +17 Michael 11 +18 Olivia 38 +19 Daniel 19 +20 Ava 28 + diff --git a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_file_number.groovy b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_file_number.groovy new file mode 100644 index 00000000000000..906fdd752ade16 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_file_number.groovy @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// fileNumber must be physical file count, not BE split count. Tiny split_size forces multi-split per file. +suite("test_streaming_insert_job_file_number") { + def tableName = "test_streaming_insert_job_file_number_tbl" + def jobName = "test_streaming_insert_job_file_number_name" + + sql """drop table if exists `${tableName}` force""" + sql """ + DROP JOB IF EXISTS where jobname = '${jobName}' + """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `c1` int NULL, + `c2` string NULL, + `c3` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`c1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + sql """ + CREATE JOB ${jobName} + PROPERTIES( + "s3.max_batch_files" = "2", + "session.max_initial_file_split_size" = "50" + ) + ON STREAMING DO INSERT INTO ${tableName} + SELECT * FROM S3 + ( + "uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv", + "format" = "csv", + "provider" = "${getS3Provider()}", + "column_separator" = ",", + "s3.endpoint" = "${getS3Endpoint()}", + "s3.region" = "${getS3Region()}", + "s3.access_key" = "${getS3AK()}", + "s3.secret_key" = "${getS3SK()}" + ); + """ + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSuccendCount: " + jobSuccendCount) + jobSuccendCount.size() == 1 && '1' <= jobSuccendCount.get(0).get(0) + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } + + sql """ PAUSE JOB where jobname = '${jobName}' """ + + def jobInfo = sql """ + select loadStatistic from jobs("type"="insert") where Name='${jobName}' + """ + log.info("jobInfo: " + jobInfo) + def loadStat = parseJson(jobInfo.get(0).get(0)) + assert loadStat.scannedRows == 20 + assert loadStat.loadBytes == 425 + assert loadStat.fileSize == 256 + // Without fix this is BE split count (6); with fix it's physical file count (2). + assert loadStat.fileNumber == 2 + + def taskInfo = sql """ + select Status, LoadStatistic from tasks("type"="insert") where JobName='${jobName}' + """ + log.info("taskInfo: " + taskInfo) + assert taskInfo.size() > 0 + def lastTask = taskInfo.get(taskInfo.size() - 1) + assert lastTask.get(0) == "SUCCESS" + def taskStat = parseJson(lastTask.get(1)) + assert taskStat.FileNumber == 2 + + qt_select """ SELECT * FROM ${tableName} order by c1 """ + + sql """ + DROP JOB IF EXISTS where jobname = '${jobName}' + """ + sql """drop table if exists `${tableName}` force""" +}