Skip to content

Commit e702792

Browse files
Mohammad LinjawiMohammad Linjawi
authored andcommitted
[GLUTEN-10215][VL] Delta write: Offload top-level NOT NULL checks
1 parent d8135ab commit e702792

12 files changed

Lines changed: 734 additions & 64 deletions

File tree

backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ import org.apache.gluten.extension.columnar.transition.Transitions
2121

2222
import org.apache.spark.sql.{AnalysisException, Dataset}
2323
import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
24-
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints, DeltaInvariantCheckerExec}
24+
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints, DeltaInvariantCheckerExec, GlutenDeltaInvariantChecker}
2525
import org.apache.spark.sql.delta.files.{GlutenDeltaFileFormatWriter, TransactionalWrite}
2626
import org.apache.spark.sql.delta.hooks.AutoCompact
2727
import org.apache.spark.sql.delta.perf.{DeltaOptimizedWriterExec, GlutenDeltaOptimizedWriterExec}
2828
import org.apache.spark.sql.delta.schema.InnerInvariantViolationException
2929
import org.apache.spark.sql.delta.sources.DeltaSQLConf
3030
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
3131
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
32-
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker}
32+
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, V1WritesUtils, WriteJobStatsTracker}
3333
import org.apache.spark.sql.internal.SQLConf
3434
import org.apache.spark.sql.util.ScalaExtensions.OptionExt
3535
import org.apache.spark.util.SerializableConfiguration
@@ -91,53 +91,64 @@ class GlutenOptimisticTransaction(delegate: OptimisticTransaction)
9191

9292
val empty2NullPlan =
9393
convertEmptyToNullIfNeeded(queryExecution.executedPlan, partitioningColumns, constraints)
94-
val maybeCheckInvariants = if (constraints.isEmpty) {
94+
val rowInvariantPlan = if (constraints.isEmpty) {
9595
// Compared to vanilla Delta, we simply avoid adding the invariant checker
9696
// when the constraint list is empty, to prevent the unnecessary transitions
9797
// from being added around the invariant checker.
9898
empty2NullPlan
9999
} else {
100100
DeltaInvariantCheckerExec(empty2NullPlan, constraints)
101101
}
102+
val nativeInvariantChecker = if (V1WritesUtils.getWriteFilesOpt(empty2NullPlan).isEmpty) {
103+
GlutenDeltaInvariantChecker.create(empty2NullPlan.output, constraints)
104+
} else {
105+
None
106+
}
107+
val nativeInvariantPlan = nativeInvariantChecker.map(_ => empty2NullPlan).getOrElse(
108+
rowInvariantPlan)
102109
def toVeloxPlan(plan: SparkPlan): SparkPlan = plan match {
103110
case aqe: AdaptiveSparkPlanExec =>
104111
assert(!aqe.isFinalPlan)
105112
aqe.copy(supportsColumnar = true)
106-
case _ => Transitions.toBatchPlan(maybeCheckInvariants, VeloxBatchType)
113+
case _ => Transitions.toBatchPlan(plan, VeloxBatchType)
107114
}
108115
// No need to plan optimized write if the write command is OPTIMIZE, which aims to produce
109116
// evenly-balanced data files already.
110-
val physicalPlan =
117+
val (physicalPlan, nativeInvariantCheckerForWrite) =
111118
if (
112119
!isOptimize &&
113120
shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
114121
) {
115122
// We uniformly convert the query plan to a columnar plan. If
116123
// the further write operation turns out to be non-offload-able, the
117124
// columnar plan will be converted back to a row-based plan.
118-
val veloxPlan = toVeloxPlan(maybeCheckInvariants)
125+
val veloxPlan = toVeloxPlan(nativeInvariantPlan)
119126
try {
120127
val glutenWriterExec =
121128
GlutenDeltaOptimizedWriterExec(veloxPlan, metadata.partitionColumns, deltaLog)
122129
val validationResult = glutenWriterExec.doValidate()
123130
if (validationResult.ok()) {
124-
glutenWriterExec
131+
(glutenWriterExec, nativeInvariantChecker)
125132
} else {
126133
logInfo(
127134
s"GlutenDeltaOptimizedWriterExec: Internal shuffle validated negative," +
128135
s" reason: ${validationResult.reason()}. Falling back to row-based shuffle.")
129-
DeltaOptimizedWriterExec(maybeCheckInvariants, metadata.partitionColumns, deltaLog)
136+
(
137+
DeltaOptimizedWriterExec(rowInvariantPlan, metadata.partitionColumns, deltaLog),
138+
None)
130139
}
131140
} catch {
132141
case e: AnalysisException =>
133142
logWarning(
134143
s"GlutenDeltaOptimizedWriterExec: Failed to create internal shuffle," +
135144
s" reason: ${e.getMessage()}. Falling back to row-based shuffle.")
136-
DeltaOptimizedWriterExec(maybeCheckInvariants, metadata.partitionColumns, deltaLog)
145+
(
146+
DeltaOptimizedWriterExec(rowInvariantPlan, metadata.partitionColumns, deltaLog),
147+
None)
137148
}
138149
} else {
139-
val veloxPlan = toVeloxPlan(maybeCheckInvariants)
140-
veloxPlan
150+
val veloxPlan = toVeloxPlan(nativeInvariantPlan)
151+
(veloxPlan, nativeInvariantChecker)
141152
}
142153

143154
val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()
@@ -185,7 +196,8 @@ class GlutenOptimisticTransaction(delegate: OptimisticTransaction)
185196
optionalStatsTracker.toSeq
186197
++ statsTrackers
187198
++ identityTrackerOpt.toSeq,
188-
options = options
199+
options = options,
200+
nativeInvariantChecker = nativeInvariantCheckerForWrite
189201
)
190202
} catch {
191203
case InnerInvariantViolationException(violationException) =>
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.delta.constraints
18+
19+
import org.apache.gluten.columnarbatch.VeloxColumnarBatches
20+
import org.apache.gluten.execution.{PlaceholderRow, TerminalRow}
21+
22+
import org.apache.spark.sql.catalyst.InternalRow
23+
import org.apache.spark.sql.catalyst.expressions.Attribute
24+
import org.apache.spark.sql.delta.constraints.Constraints.NotNull
25+
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, SchemaUtils}
26+
import org.apache.spark.sql.vectorized.ColumnarBatch
27+
28+
/**
29+
* Native write-time invariant checker for constraints that can be validated without converting
30+
* Velox batches back to Spark rows.
31+
*/
32+
private[delta] case class GlutenDeltaInvariantChecker private (
33+
notNullConstraints: Seq[(Int, NotNull)])
34+
extends Serializable {
35+
36+
@transient private lazy val columnOrdinals: Array[Int] =
37+
notNullConstraints.map(_._1).toArray
38+
39+
def wrap(rows: Iterator[InternalRow]): Iterator[InternalRow] = {
40+
rows.map {
41+
row =>
42+
check(row)
43+
row
44+
}
45+
}
46+
47+
private def check(row: InternalRow): Unit = row match {
48+
case _: PlaceholderRow =>
49+
case terminal: TerminalRow => check(terminal.batch())
50+
case other => checkRow(other)
51+
}
52+
53+
private def check(batch: ColumnarBatch): Unit = {
54+
val failedConstraintIndex = VeloxColumnarBatches.firstNullColumnIndex(batch, columnOrdinals)
55+
if (failedConstraintIndex >= 0) {
56+
throw DeltaInvariantViolationException(notNullConstraints(failedConstraintIndex)._2)
57+
}
58+
}
59+
60+
private def checkRow(row: InternalRow): Unit = {
61+
var i = 0
62+
while (i < notNullConstraints.size) {
63+
val (ordinal, constraint) = notNullConstraints(i)
64+
if (row.isNullAt(ordinal)) {
65+
throw DeltaInvariantViolationException(constraint)
66+
}
67+
i += 1
68+
}
69+
}
70+
}
71+
72+
private[delta] object GlutenDeltaInvariantChecker {
73+
def create(
74+
output: Seq[Attribute],
75+
constraints: Seq[Constraint]): Option[GlutenDeltaInvariantChecker] = {
76+
if (constraints.isEmpty) {
77+
return None
78+
}
79+
80+
val topLevelNotNullConstraints = constraints.collect {
81+
case constraint: NotNull if constraint.column.length == 1 => constraint
82+
}
83+
if (topLevelNotNullConstraints.size != constraints.size) {
84+
return None
85+
}
86+
87+
val checks = topLevelNotNullConstraints.map {
88+
constraint =>
89+
val columnName = constraint.column.head
90+
val ordinal = output.indexWhere {
91+
attribute => SchemaUtils.DELTA_COL_RESOLVER(attribute.name, columnName)
92+
}
93+
if (ordinal < 0) {
94+
return None
95+
}
96+
ordinal -> constraint
97+
}
98+
Some(GlutenDeltaInvariantChecker(checks))
99+
}
100+
}

backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
3535
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
3636
import org.apache.spark.sql.connector.write.WriterCommitMessage
3737
import org.apache.spark.sql.delta.DeltaOptions
38+
import org.apache.spark.sql.delta.constraints.GlutenDeltaInvariantChecker
3839
import org.apache.spark.sql.delta.logging.DeltaLogKeys
3940
import org.apache.spark.sql.delta.stats.GlutenDeltaJobStatsTracker
4041
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -71,6 +72,12 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
7172
/** A variable used in tests to check the final executed plan. */
7273
private var executedPlan: Option[SparkPlan] = None
7374

75+
private[delta] def getExecutedPlanForTesting: Option[SparkPlan] = executedPlan
76+
77+
private[delta] def clearExecutedPlanForTesting(): Unit = {
78+
executedPlan = None
79+
}
80+
7481
// scalastyle:off argcount
7582
/**
7683
* Basic work flow of this command is:
@@ -96,7 +103,8 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
96103
bucketSpec: Option[BucketSpec],
97104
statsTrackers: Seq[WriteJobStatsTracker],
98105
options: Map[String, String],
99-
numStaticPartitionCols: Int = 0): Set[String] = {
106+
numStaticPartitionCols: Int = 0,
107+
nativeInvariantChecker: Option[GlutenDeltaInvariantChecker] = None): Set[String] = {
100108
require(partitionColumns.size >= numStaticPartitionCols)
101109

102110
val job = Job.getInstance(hadoopConf)
@@ -225,7 +233,8 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
225233
partitionColumns,
226234
sortColumns,
227235
orderingMatched,
228-
isNativeWritable
236+
isNativeWritable,
237+
nativeInvariantChecker
229238
)
230239
}
231240
}
@@ -242,7 +251,8 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
242251
partitionColumns: Seq[Attribute],
243252
sortColumns: Seq[Attribute],
244253
orderingMatched: Boolean,
245-
writeOffloadable: Boolean): Set[String] = {
254+
writeOffloadable: Boolean,
255+
nativeInvariantChecker: Option[GlutenDeltaInvariantChecker]): Set[String] = {
246256
val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns)
247257
val empty2NullPlan =
248258
if (projectList.nonEmpty) ProjectExecTransformer(projectList, plan) else plan
@@ -318,7 +328,8 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
318328
committer,
319329
iterator = iter,
320330
concurrentOutputWriterSpec = concurrentOutputWriterSpec,
321-
partitionColumnToDataType
331+
partitionColumnToDataType,
332+
nativeInvariantChecker
322333
)
323334
},
324335
rddWithNonEmptyPartitions.partitions.indices,
@@ -433,7 +444,8 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
433444
committer: FileCommitProtocol,
434445
iterator: Iterator[InternalRow],
435446
concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec],
436-
partitionColumnToDataType: Map[String, DataType]): WriteTaskResult = {
447+
partitionColumnToDataType: Map[String, DataType],
448+
nativeInvariantChecker: Option[GlutenDeltaInvariantChecker]): WriteTaskResult = {
437449

438450
val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, sparkStageId)
439451
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
@@ -487,7 +499,8 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
487499
try {
488500
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
489501
// Execute the task to write rows out and commit the task.
490-
dataWriter.writeWithIterator(iterator)
502+
val rowsToWrite = nativeInvariantChecker.map(_.wrap(iterator)).getOrElse(iterator)
503+
dataWriter.writeWithIterator(rowsToWrite)
491504
dataWriter.commit()
492505
})(
493506
catchBlock = {

0 commit comments

Comments
 (0)