Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -403,16 +403,21 @@ public void createScanRangeLocations() throws UserException {
if (inputSplits.isEmpty() && !isFileStreamType()) {
return;
}
Set<String> distinctFilePaths = new HashSet<>();
Multimap<Backend, Split> assignment = backendPolicy.computeScanRangeAssignment(inputSplits);
for (Backend backend : assignment.keySet()) {
Collection<Split> splits = assignment.get(backend);
for (Split split : splits) {
scanRangeLocations.add(splitToScanRange(backend, locationProperties, split, pathPartitionKeys,
admissionResult));
totalFileSize += split.getLength();
if (split instanceof FileSplit) {
distinctFilePaths.add(((FileSplit) split).getPathString());
}
}
scanBackendIds.add(backend.getId());
}
selectedFileNum = distinctFilePaths.size();
}

getSerializedTable().ifPresent(params::setSerializedTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
public abstract class FileScanNode extends ExternalScanNode {
// For explain
protected long totalFileSize = 0;
// -1 means unknown (batch-mode scans don't materialize splits on FE).
protected int selectedFileNum = -1;
protected long totalPartitionNum = 0;
// For display pushdown agg result
protected long tableLevelRowCount = -1;
Expand Down Expand Up @@ -113,6 +115,10 @@ public long getTotalFileSize() {
return totalFileSize;
}

public int getSelectedFileNum() {
return selectedFileNum;
}

/**
* Get all delete files for the given file range.
* @param rangeDesc the file range descriptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,12 @@ private void applyInsertPlanStatistic(FastInsertIntoValuesPlanner planner) {
for (PlanFragment fragment : planner.getFragments()) {
if (fragment.getPlanRoot() instanceof FileScanNode) {
FileScanNode fileScanNode = (FileScanNode) fragment.getPlanRoot();
// Prefer distinct file count; fall back to split count for batch-mode scans.
int fileNum = fileScanNode.getSelectedFileNum() >= 0
? fileScanNode.getSelectedFileNum()
: (int) fileScanNode.getSelectedSplitNum();
Env.getCurrentEnv().getLoadManager().getLoadJob(getJobId())
.addLoadFileInfo((int) fileScanNode.getSelectedSplitNum(), fileScanNode.getTotalFileSize());
.addLoadFileInfo(fileNum, fileScanNode.getTotalFileSize());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --
1 Emily 25
2 Benjamin 35
3 Olivia 28
4 Alexander 60
5 Ava 17
6 William 69
7 Sophia 32
8 James 64
9 Emma 37
10 Liam 64
11 Alexander 34
12 Isabella 43
13 Benjamin 56
14 Sophia 12
15 Christopher 33
16 Emma 23
17 Michael 11
18 Olivia 38
19 Daniel 19
20 Ava 28

Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.


import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

// fileNumber must be physical file count, not BE split count. Tiny split_size forces multi-split per file.
suite("test_streaming_insert_job_file_number") {
def tableName = "test_streaming_insert_job_file_number_tbl"
def jobName = "test_streaming_insert_job_file_number_name"

sql """drop table if exists `${tableName}` force"""
sql """
DROP JOB IF EXISTS where jobname = '${jobName}'
"""

sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`c1` int NULL,
`c2` string NULL,
`c3` int NULL
) ENGINE=OLAP
DUPLICATE KEY(`c1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`c1`) BUCKETS 3
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""

sql """
CREATE JOB ${jobName}
PROPERTIES(
"s3.max_batch_files" = "2",
"session.max_initial_file_split_size" = "50"
)
ON STREAMING DO INSERT INTO ${tableName}
SELECT * FROM S3
(
"uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
"format" = "csv",
"provider" = "${getS3Provider()}",
"column_separator" = ",",
"s3.endpoint" = "${getS3Endpoint()}",
"s3.region" = "${getS3Region()}",
"s3.access_key" = "${getS3AK()}",
"s3.secret_key" = "${getS3SK()}"
);
"""

try {
Awaitility.await().atMost(300, SECONDS)
.pollInterval(1, SECONDS).until(
{
def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
log.info("jobSuccendCount: " + jobSuccendCount)
jobSuccendCount.size() == 1 && '1' <= jobSuccendCount.get(0).get(0)
}
)
} catch (Exception ex) {
def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
log.info("show job: " + showjob)
log.info("show task: " + showtask)
throw ex;
}

sql """ PAUSE JOB where jobname = '${jobName}' """

def jobInfo = sql """
select loadStatistic from jobs("type"="insert") where Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
def loadStat = parseJson(jobInfo.get(0).get(0))
assert loadStat.scannedRows == 20
assert loadStat.loadBytes == 425
assert loadStat.fileSize == 256
// Without fix this is BE split count (6); with fix it's physical file count (2).
assert loadStat.fileNumber == 2

def taskInfo = sql """
select Status, LoadStatistic from tasks("type"="insert") where JobName='${jobName}'
"""
log.info("taskInfo: " + taskInfo)
assert taskInfo.size() > 0
def lastTask = taskInfo.get(taskInfo.size() - 1)
assert lastTask.get(0) == "SUCCESS"
def taskStat = parseJson(lastTask.get(1))
assert taskStat.FileNumber == 2

qt_select """ SELECT * FROM ${tableName} order by c1 """

sql """
DROP JOB IF EXISTS where jobname = '${jobName}'
"""
sql """drop table if exists `${tableName}` force"""
}
Loading