Skip to content

Commit 4ab6261

Browse files
authored
branch-4.0: [fix](nereids) bind file column placeholders for copy into select (#64590)
pick #64395
1 parent ef4582d commit 4ab6261

3 files changed

Lines changed: 190 additions & 23 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.doris.nereids.trees.plans.commands.info;
1919

2020
import org.apache.doris.analysis.CopyFromParam;
21-
import org.apache.doris.analysis.SlotRef;
2221
import org.apache.doris.analysis.StageAndPattern;
2322
import org.apache.doris.analysis.TableName;
2423
import org.apache.doris.catalog.Column;
@@ -203,23 +202,33 @@ private boolean getFileColumnNames(boolean addDeleteSign) throws AnalysisExcepti
203202
if (exprList == null) {
204203
return false;
205204
}
206-
List<SlotRef> slotRefs = Lists.newArrayList();
207-
// Expr.collectList(exprList, SlotRef.class, slotRefs);
205+
boolean hasFileColumnPlaceholder = false;
208206
Set<String> columnSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
209-
for (SlotRef slotRef : slotRefs) {
210-
String columnName = slotRef.getColumnName();
211-
if (columnName.startsWith(DOLLAR)) {
212-
if (fileColumns.size() > 0) {
207+
List<Expression> fileColumnExpressions = exprList.stream().map(expr -> (Expression) expr)
208+
.collect(Collectors.toList());
209+
fileFilterExpr.ifPresent(fileColumnExpressions::add);
210+
for (Expression expr : fileColumnExpressions) {
211+
for (UnboundSlot slot : expr.<UnboundSlot>collectToList(UnboundSlot.class::isInstance)) {
212+
String columnName = slot.getName();
213+
if (columnName.startsWith(DOLLAR)) {
214+
if (!fileColumns.isEmpty()) {
215+
throw new AnalysisException("can not mix column name and dollar sign");
216+
}
217+
hasFileColumnPlaceholder = true;
218+
continue;
219+
}
220+
if (hasFileColumnPlaceholder) {
213221
throw new AnalysisException("can not mix column name and dollar sign");
214222
}
215-
return false;
216-
}
217-
if (columnSet.add(columnName)) {
218-
fileColumns.add(columnName);
223+
if (columnSet.add(columnName)) {
224+
fileColumns.add(columnName);
225+
}
219226
}
220227
}
228+
if (hasFileColumnPlaceholder) {
229+
return false;
230+
}
221231
if (addDeleteSign) {
222-
// exprList.add(new SlotRef(null, Column.DELETE_SIGN));
223232
fileColumns.add(Column.DELETE_SIGN);
224233
}
225234
return true;

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java

Lines changed: 104 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.doris.nereids.CascadesContext;
5454
import org.apache.doris.nereids.analyzer.Scope;
5555
import org.apache.doris.nereids.analyzer.UnboundRelation;
56+
import org.apache.doris.nereids.analyzer.UnboundSlot;
5657
import org.apache.doris.nereids.glue.translator.ExpressionTranslator;
5758
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
5859
import org.apache.doris.nereids.jobs.executor.Analyzer;
@@ -62,6 +63,8 @@
6263
import org.apache.doris.nereids.rules.analysis.ExpressionAnalyzer;
6364
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
6465
import org.apache.doris.nereids.trees.expressions.Cast;
66+
import org.apache.doris.nereids.trees.expressions.EqualTo;
67+
import org.apache.doris.nereids.trees.expressions.ExprId;
6568
import org.apache.doris.nereids.trees.expressions.Expression;
6669
import org.apache.doris.nereids.trees.expressions.Slot;
6770
import org.apache.doris.nereids.trees.expressions.SlotReference;
@@ -70,6 +73,8 @@
7073
import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
7174
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
7275
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
76+
import org.apache.doris.nereids.types.DataType;
77+
import org.apache.doris.nereids.types.StringType;
7378
import org.apache.doris.nereids.util.Utils;
7479
import org.apache.doris.qe.ConnectContext;
7580
import org.apache.doris.qe.OriginStatement;
@@ -239,8 +244,10 @@ public void doValidate(String user, String db, boolean checkAuth) throws Analysi
239244
}
240245
PlanTranslatorContext context = new PlanTranslatorContext(cascadesContext);
241246
List<Slot> slots = boundRelation.getOutput();
242-
Scope scope = new Scope(slots);
243-
ExpressionAnalyzer analyzer = new ExpressionAnalyzer(null, scope, cascadesContext, false, false);
247+
CopyIntoFileSlots fileSlots = new CopyIntoFileSlots(slots, copyFromDesc.getFileColumns(),
248+
copyFromDesc.getColumnMappingList());
249+
ExpressionAnalyzer analyzer = new ExpressionAnalyzer(null, new Scope(fileSlots.getScopeSlots()),
250+
cascadesContext, false, false);
244251

245252
Map<SlotReference, SlotRef> translateMap = Maps.newHashMap();
246253

@@ -257,13 +264,14 @@ public void doValidate(String user, String db, boolean checkAuth) throws Analysi
257264
if (copyFromDesc.getColumnMappingList() != null && !copyFromDesc.getColumnMappingList().isEmpty()) {
258265
legacyColumnMappingList = new ArrayList<>();
259266
for (Expression expression : copyFromDesc.getColumnMappingList()) {
260-
legacyColumnMappingList.add(translateToLegacyExpr(expression, analyzer, context, cascadesContext));
267+
legacyColumnMappingList.add(translateToLegacyExpr(expression, analyzer, context, cascadesContext,
268+
fileSlots));
261269
}
262270
}
263271
Expr legacyFileFilterExpr = null;
264272
if (copyFromDesc.getFileFilterExpr().isPresent()) {
265273
legacyFileFilterExpr = translateToLegacyExpr(copyFromDesc.getFileFilterExpr().get(),
266-
analyzer, context, cascadesContext);
274+
analyzer, context, cascadesContext, fileSlots);
267275
}
268276

269277
String compression = copyIntoProperties.getCompression();
@@ -301,30 +309,32 @@ public void doValidate(String user, String db, boolean checkAuth) throws Analysi
301309
}
302310

303311
// translate copy from description to copy from param
304-
legacyCopyFromParam = toLegacyParam(copyFromDesc, analyzer, context, cascadesContext);
312+
legacyCopyFromParam = toLegacyParam(copyFromDesc, analyzer, context, cascadesContext, fileSlots);
305313
}
306314

307315
private CopyFromParam toLegacyParam(CopyFromDesc copyFromDesc, ExpressionAnalyzer analyzer,
308-
PlanTranslatorContext context, CascadesContext cascadesContext) {
316+
PlanTranslatorContext context, CascadesContext cascadesContext,
317+
CopyIntoFileSlots fileSlots) {
309318
StageAndPattern stageAndPattern = copyFromDesc.getStageAndPattern();
310319
List<Expr> exprList = null;
311320
if (copyFromDesc.getExprList() != null) {
312321
exprList = new ArrayList<>();
313322
for (Expression expression : copyFromDesc.getExprList()) {
314-
exprList.add(translateToLegacyExpr(expression, analyzer, context, cascadesContext));
323+
exprList.add(translateToLegacyExpr(expression, analyzer, context, cascadesContext, fileSlots));
315324
}
316325
}
317326
Expr fileFilterExpr = null;
318327
if (copyFromDesc.getFileFilterExpr().isPresent()) {
319328
fileFilterExpr = translateToLegacyExpr(copyFromDesc.getFileFilterExpr().get(),
320-
analyzer, context, cascadesContext);
329+
analyzer, context, cascadesContext, fileSlots);
321330
}
322331
List<String> fileColumns = copyFromDesc.getFileColumns();
323332
List<Expr> columnMappingList = null;
324333
if (copyFromDesc.getColumnMappingList() != null) {
325334
columnMappingList = new ArrayList<>();
326335
for (Expression expression : copyFromDesc.getColumnMappingList()) {
327-
columnMappingList.add(translateToLegacyExpr(expression, analyzer, context, cascadesContext));
336+
columnMappingList.add(translateToLegacyExpr(expression, analyzer, context, cascadesContext,
337+
fileSlots));
328338
}
329339
}
330340
List<String> targetColumns = copyFromDesc.getTargetColumns();
@@ -333,7 +343,7 @@ private CopyFromParam toLegacyParam(CopyFromDesc copyFromDesc, ExpressionAnalyze
333343
}
334344

335345
private Expr translateToLegacyExpr(Expression expr, ExpressionAnalyzer analyzer, PlanTranslatorContext context,
336-
CascadesContext cascadesContext) {
346+
CascadesContext cascadesContext, CopyIntoFileSlots fileSlots) {
337347
Expression expression;
338348
try {
339349
expression = analyzer.analyze(expr, new ExpressionRewriteContext(cascadesContext));
@@ -342,11 +352,26 @@ private Expr translateToLegacyExpr(Expression expr, ExpressionAnalyzer analyzer,
342352
+ expr.toSql() + "', "
343353
+ Utils.convertFirstChar(e.getMessage()));
344354
}
345-
ExpressionToExpr translator = new ExpressionToExpr();
355+
ExpressionToExpr translator = new ExpressionToExpr(fileSlots);
346356
return expression.accept(translator, context);
347357
}
348358

349359
private static class ExpressionToExpr extends ExpressionTranslator {
360+
private final CopyIntoFileSlots fileSlots;
361+
362+
private ExpressionToExpr(CopyIntoFileSlots fileSlots) {
363+
this.fileSlots = fileSlots;
364+
}
365+
366+
@Override
367+
public Expr visitSlotReference(SlotReference slotReference, PlanTranslatorContext context) {
368+
String fileSlotName = fileSlots.getFileSlotName(slotReference.getExprId());
369+
if (fileSlotName != null) {
370+
return new SlotRef(null, fileSlotName);
371+
}
372+
return super.visitSlotReference(slotReference, context);
373+
}
374+
350375
@Override
351376
public Expr visitCast(Cast cast, PlanTranslatorContext context) {
352377
// left child of cast is target type, right child of cast is expression
@@ -355,6 +380,74 @@ public Expr visitCast(Cast cast, PlanTranslatorContext context) {
355380
}
356381
}
357382

383+
private static class CopyIntoFileSlots {
384+
private final List<Slot> scopeSlots;
385+
private final Map<ExprId, String> fileSlotNames = Maps.newHashMap();
386+
387+
private CopyIntoFileSlots(List<Slot> targetSlots, List<String> fileColumns,
388+
List<Expression> columnMappingList) {
389+
scopeSlots = new ArrayList<>(targetSlots);
390+
if (fileColumns == null) {
391+
return;
392+
}
393+
Map<String, DataType> targetColumnTypes = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
394+
for (Slot slot : targetSlots) {
395+
targetColumnTypes.put(slot.getName(), slot.getDataType());
396+
}
397+
Map<String, DataType> fileColumnTypes = inferFileColumnTypes(targetColumnTypes, columnMappingList);
398+
for (String fileColumn : fileColumns) {
399+
if (!isFileColumnPlaceholder(fileColumn) || fileSlotNames.containsValue(fileColumn)) {
400+
continue;
401+
}
402+
SlotReference slot = new SlotReference(fileColumn,
403+
fileColumnTypes.getOrDefault(fileColumn, StringType.INSTANCE), true);
404+
scopeSlots.add(slot);
405+
fileSlotNames.put(slot.getExprId(), fileColumn);
406+
}
407+
}
408+
409+
private List<Slot> getScopeSlots() {
410+
return scopeSlots;
411+
}
412+
413+
private String getFileSlotName(ExprId exprId) {
414+
return fileSlotNames.get(exprId);
415+
}
416+
417+
private static boolean isFileColumnPlaceholder(String columnName) {
418+
return columnName != null && columnName.startsWith("$");
419+
}
420+
421+
private static Map<String, DataType> inferFileColumnTypes(Map<String, DataType> targetColumnTypes,
422+
List<Expression> columnMappingList) {
423+
Map<String, DataType> fileColumnTypes = Maps.newHashMap();
424+
if (columnMappingList == null) {
425+
return fileColumnTypes;
426+
}
427+
for (Expression expression : columnMappingList) {
428+
if (!(expression instanceof EqualTo)) {
429+
continue;
430+
}
431+
EqualTo columnMapping = (EqualTo) expression;
432+
if (!(columnMapping.left() instanceof UnboundSlot)) {
433+
continue;
434+
}
435+
DataType targetType = targetColumnTypes.get(((UnboundSlot) columnMapping.left()).getName());
436+
if (targetType == null) {
437+
continue;
438+
}
439+
for (UnboundSlot fileColumn : columnMapping.right()
440+
.<UnboundSlot>collect(UnboundSlot.class::isInstance)) {
441+
String fileColumnName = fileColumn.getName();
442+
if (isFileColumnPlaceholder(fileColumnName)) {
443+
fileColumnTypes.putIfAbsent(fileColumnName, targetType);
444+
}
445+
}
446+
}
447+
return fileColumnTypes;
448+
}
449+
}
450+
358451
// after validateStagePB, fileFormat and copyOption is not null
359452
private void validateStagePB(StagePB stagePB) throws AnalysisException {
360453
stageType = stagePB.getType();

regression-test/suites/load_p0/copy_into/test_copy_into.groovy

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,71 @@ suite("test_copy_into", "p0") {
150150
}
151151
assertTrue(false, "should not come here")
152152
}
153+
154+
def csvStageName = "test_copy_into_csv"
155+
try_sql """drop stage if exists ${csvStageName}"""
156+
sql """
157+
create stage if not exists ${csvStageName}
158+
properties ('endpoint' = '${getS3Endpoint()}' ,
159+
'region' = '${getS3Region()}' ,
160+
'bucket' = '${getS3BucketName()}' ,
161+
'prefix' = 'regression' ,
162+
'ak' = '${getS3AK()}' ,
163+
'sk' = '${getS3SK()}' ,
164+
'provider' = '${getS3Provider()}',
165+
'access_type' = 'aksk',
166+
'default.file.column_separator' = "|");
167+
"""
168+
169+
sql """ DROP TABLE IF EXISTS copy_into_select_placeholder; """
170+
sql """
171+
CREATE TABLE copy_into_select_placeholder (
172+
p_partkey int NOT NULL DEFAULT "1",
173+
p_name VARCHAR(55) NOT NULL DEFAULT "2",
174+
p_mfgr VARCHAR(25) NOT NULL DEFAULT "3"
175+
)ENGINE=OLAP
176+
DUPLICATE KEY(`p_partkey`)
177+
COMMENT "OLAP"
178+
DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3;
179+
"""
180+
181+
result = sql """
182+
copy into copy_into_select_placeholder
183+
from (select \$1, \$2, \$3 from @${csvStageName}('tpch/sf1/part.csv.split00.gz'))
184+
properties ('file.type' = 'csv', 'file.column_separator' = '|',
185+
'file.compression' = 'gz', 'copy.async' = 'false');
186+
"""
187+
logger.info("copy select placeholder result: " + result)
188+
assertTrue(result.size() == 1)
189+
assertTrue(result[0][1].equals("FINISHED"),
190+
"Finish copy into, state=" + result[0][1] + ", expected state=FINISHED")
191+
def selectPlaceholderCount = sql """ SELECT COUNT(*) FROM copy_into_select_placeholder; """
192+
assertTrue((selectPlaceholderCount[0][0] as long) > 0)
193+
194+
sql """ DROP TABLE IF EXISTS copy_into_filter_placeholder; """
195+
sql """
196+
CREATE TABLE copy_into_filter_placeholder (
197+
p_partkey int NOT NULL DEFAULT "1"
198+
)ENGINE=OLAP
199+
DUPLICATE KEY(`p_partkey`)
200+
COMMENT "OLAP"
201+
DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 3;
202+
"""
203+
204+
result = sql """
205+
copy into copy_into_filter_placeholder (p_partkey)
206+
from (select 1 from @${csvStageName}('tpch/sf1/part.csv.split00.gz') where \$1 is not null)
207+
properties ('file.type' = 'csv', 'file.column_separator' = '|',
208+
'file.compression' = 'gz', 'copy.async' = 'false');
209+
"""
210+
logger.info("copy filter placeholder result: " + result)
211+
assertTrue(result.size() == 1)
212+
assertTrue(result[0][1].equals("FINISHED"),
213+
"Finish copy into, state=" + result[0][1] + ", expected state=FINISHED")
214+
def filterPlaceholderCount = sql """ SELECT COUNT(*) FROM copy_into_filter_placeholder; """
215+
assertTrue((filterPlaceholderCount[0][0] as long) > 0)
216+
217+
try_sql """drop stage if exists ${csvStageName}"""
153218
try_sql """drop stage if exists ${externalStageName}"""
154219
}
155220
}

0 commit comments

Comments
 (0)