Skip to content

Commit d6fcdf8

Browse files
author
Minni Mittal
committed
Add LocalTableScanExec support to Velox backend
1 parent 7669052 commit d6fcdf8

6 files changed

Lines changed: 290 additions & 0 deletions

File tree

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,6 +1212,14 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging {
12121212
VeloxColumnarToCarrierRowExec.enforce(plan)
12131213
}
12141214

1215+
override def isSupportLocalTableScanExec(plan: LocalTableScanExec): Boolean = {
1216+
// Skip offloading when stream is defined (structured streaming source)
1217+
plan.getStream.isEmpty
1218+
}
1219+
1220+
override def getLocalTableScanTransform(plan: LocalTableScanExec): LocalTableScanTransformer =
1221+
VeloxLocalTableScanTransformer.replace(plan)
1222+
12151223
override def genTimestampAddTransformer(
12161224
substraitExprName: String,
12171225
left: ExpressionTransformer,
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.execution
18+
19+
import org.apache.gluten.backendsapi.velox.VeloxValidatorApi
20+
import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
21+
22+
import org.apache.spark.internal.Logging
23+
import org.apache.spark.rdd.RDD
24+
import org.apache.spark.sql.catalyst.InternalRow
25+
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, UnsafeProjection}
26+
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
27+
import org.apache.spark.sql.execution.{LocalTableScanTransformer, SparkPlan}
28+
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
29+
import org.apache.spark.sql.vectorized.ColumnarBatch
30+
31+
/**
32+
* Velox-backend implementation of LocalTableScanTransformer.
33+
*
34+
* Converts a driver-side local collection (Seq[InternalRow]) into columnar batches using Velox's
35+
* native row-to-columnar conversion (same JNI path as RowToVeloxColumnarExec).
36+
*/
37+
case class VeloxLocalTableScanTransformer(
38+
outputAttributes: Seq[Attribute],
39+
rows: Seq[InternalRow],
40+
// Row-to-columnar conversion preserves data distribution, so we carry through
41+
// the original partitioning, consistent with RowToVeloxColumnarExec's behavior.
42+
override val outputPartitioning: Partitioning,
43+
override val outputOrdering: Seq[SortOrder]
44+
) extends LocalTableScanTransformer(outputAttributes, outputPartitioning, outputOrdering)
45+
with Logging {
46+
47+
@transient override lazy val metrics: Map[String, SQLMetric] = Map(
48+
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
49+
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
50+
"convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert")
51+
)
52+
53+
override protected def doValidateInternal(): ValidationResult = {
54+
for (field <- schema.fields) {
55+
val reason = VeloxValidatorApi.validateSchema(field.dataType)
56+
if (reason.isDefined) {
57+
return ValidationResult.failed(reason.get)
58+
}
59+
}
60+
ValidationResult.succeeded
61+
}
62+
63+
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
64+
val numInputRows = longMetric("numInputRows")
65+
val numOutputBatches = longMetric("numOutputBatches")
66+
val convertTime = longMetric("convertTime")
67+
val localSchema = this.schema
68+
val batchSize = GlutenConfig.get.maxBatchSize
69+
val batchBytes = VeloxConfig.get.veloxPreferredBatchBytes
70+
71+
if (rows.isEmpty) {
72+
sparkContext.emptyRDD[ColumnarBatch]
73+
} else {
74+
// Materialize rows as UnsafeRow on the driver, then parallelize
75+
val proj = UnsafeProjection.create(outputAttributes, outputAttributes)
76+
val unsafeRows = rows.map(r => proj(r).copy()).toArray
77+
val numSlices = math.min(unsafeRows.length, sparkContext.defaultParallelism)
78+
val rowRdd = sparkContext.parallelize(unsafeRows.toSeq, numSlices)
79+
80+
rowRdd.mapPartitions {
81+
iter =>
82+
RowToVeloxColumnarExec.toColumnarBatchIterator(
83+
iter,
84+
localSchema,
85+
numInputRows,
86+
numOutputBatches,
87+
convertTime,
88+
batchSize,
89+
batchBytes)
90+
}
91+
}
92+
}
93+
94+
override protected def withNewChildrenInternal(
95+
newChildren: IndexedSeq[SparkPlan]): SparkPlan = {
96+
assert(newChildren.isEmpty, "VeloxLocalTableScanTransformer is a leaf node")
97+
copy(outputAttributes, rows, outputPartitioning, outputOrdering)
98+
}
99+
}
100+
101+
object VeloxLocalTableScanTransformer {
102+
def replace(plan: org.apache.spark.sql.execution.LocalTableScanExec): LocalTableScanTransformer =
103+
VeloxLocalTableScanTransformer(
104+
plan.output,
105+
plan.rows,
106+
plan.outputPartitioning,
107+
plan.outputOrdering)
108+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution
18+
19+
import org.apache.gluten.execution._
20+
21+
import org.apache.spark.SparkConf
22+
import org.apache.spark.sql.{DataFrame, Row}
23+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
24+
import org.apache.spark.sql.types._
25+
26+
import java.util.{Arrays => JArrays}
27+
28+
class VeloxLocalTableScanSuite
29+
extends VeloxWholeStageTransformerSuite
30+
with AdaptiveSparkPlanHelper {
31+
32+
override protected val resourcePath: String = "/tpch-data-parquet"
33+
override protected val fileFormat: String = "parquet"
34+
35+
override protected def sparkConf: SparkConf = {
36+
super.sparkConf
37+
.set("spark.sql.ansi.enabled", "false")
38+
}
39+
40+
private def assertHasVeloxLocalTableScan(df: DataFrame): Unit = {
41+
val found = collect(df.queryExecution.executedPlan) {
42+
case _: VeloxLocalTableScanTransformer => true
43+
}
44+
assert(found.nonEmpty, "Expected VeloxLocalTableScanTransformer in plan")
45+
}
46+
47+
private def createDF(rows: Seq[Row], schema: StructType): DataFrame = {
48+
spark.createDataFrame(JArrays.asList(rows: _*), schema)
49+
}
50+
51+
test("basic LocalTableScanExec with int and string columns") {
52+
val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
53+
val rows = Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"))
54+
val df = createDF(rows, schema)
55+
checkAnswer(df, rows)
56+
assertHasVeloxLocalTableScan(df)
57+
}
58+
59+
test("LocalTableScan with numeric types") {
60+
val schema = StructType(
61+
Seq(
62+
StructField("lng", LongType),
63+
StructField("dbl", DoubleType),
64+
StructField("flt", FloatType),
65+
StructField("shrt", ShortType),
66+
StructField("byt", ByteType)))
67+
val rows = Seq(Row(1L, 1.5, 2.5f, 100.toShort, 42.toByte))
68+
val df = createDF(rows, schema)
69+
checkAnswer(df, rows)
70+
assertHasVeloxLocalTableScan(df)
71+
}
72+
73+
test("LocalTableScan with boolean and null types") {
74+
val schema = StructType(
75+
Seq(StructField("flag", BooleanType), StructField("value", IntegerType, nullable = true)))
76+
val rows = Seq(Row(true, 1), Row(false, null))
77+
val df = createDF(rows, schema)
78+
checkAnswer(df, rows)
79+
assertHasVeloxLocalTableScan(df)
80+
}
81+
82+
test("LocalTableScan with empty collection") {
83+
val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
84+
val df = createDF(Seq.empty, schema)
85+
checkAnswer(df, Seq.empty[Row])
86+
}
87+
88+
test("LocalTableScan with aggregation downstream") {
89+
val schema = StructType(Seq(StructField("key", StringType), StructField("value", IntegerType)))
90+
val rows = Seq(Row("a", 10), Row("b", 20), Row("a", 30))
91+
val df = createDF(rows, schema)
92+
val result = df.groupBy("key").sum("value")
93+
checkAnswer(result, Seq(Row("a", 40), Row("b", 20)))
94+
assertHasVeloxLocalTableScan(result)
95+
}
96+
97+
test("LocalTableScan with filter downstream") {
98+
val schema = StructType(Seq(StructField("x", IntegerType)))
99+
val rows = Seq(Row(1), Row(2), Row(3), Row(4), Row(5))
100+
val df = createDF(rows, schema).filter("x > 3")
101+
checkAnswer(df, Seq(Row(4), Row(5)))
102+
assertHasVeloxLocalTableScan(df)
103+
}
104+
105+
test("LocalTableScan with join") {
106+
val leftSchema =
107+
StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
108+
val rightSchema =
109+
StructType(Seq(StructField("id", IntegerType), StructField("score", IntegerType)))
110+
val left = createDF(Seq(Row(1, "a"), Row(2, "b")), leftSchema)
111+
val right = createDF(Seq(Row(1, 100), Row(2, 200)), rightSchema)
112+
val result = left.join(right, "id")
113+
checkAnswer(result, Seq(Row(1, "a", 100), Row(2, "b", 200)))
114+
assertHasVeloxLocalTableScan(result)
115+
}
116+
}

gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,11 @@ trait SparkPlanExecApi {
777777
def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer =
778778
throw new GlutenNotSupportException("RDDScanExec is not supported")
779779

780+
def isSupportLocalTableScanExec(plan: LocalTableScanExec): Boolean = false
781+
782+
def getLocalTableScanTransform(plan: LocalTableScanExec): LocalTableScanTransformer =
783+
throw new GlutenNotSupportException("LocalTableScanExec is not supported")
784+
780785
def copyColumnarBatch(batch: ColumnarBatch): ColumnarBatch =
781786
throw new GlutenNotSupportException("Copying ColumnarBatch is not supported")
782787

gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,9 @@ object OffloadOthers {
315315
child)
316316
case plan: RDDScanExec if RDDScanTransformer.isSupportRDDScanExec(plan) =>
317317
RDDScanTransformer.getRDDScanTransform(plan)
318+
case plan: LocalTableScanExec
319+
if LocalTableScanTransformer.isSupportLocalTableScanExec(plan) =>
320+
LocalTableScanTransformer.getLocalTableScanTransform(plan)
318321
case p if !p.isInstanceOf[GlutenPlan] =>
319322
logDebug(s"Transformation for ${p.getClass} is currently not supported.")
320323
p
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution
18+
19+
import org.apache.gluten.backendsapi.BackendsApiManager
20+
import org.apache.gluten.execution.ValidatablePlan
21+
import org.apache.gluten.extension.columnar.transition.Convention
22+
23+
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
24+
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
25+
26+
abstract class LocalTableScanTransformer(
27+
outputAttributes: Seq[Attribute],
28+
override val outputPartitioning: Partitioning = UnknownPartitioning(0),
29+
override val outputOrdering: Seq[SortOrder] = Nil
30+
) extends ValidatablePlan {
31+
32+
override def rowType0(): Convention.RowType = Convention.RowType.None
33+
override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType
34+
override def output: Seq[Attribute] = outputAttributes
35+
36+
override protected def doExecute()
37+
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
38+
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
39+
}
40+
41+
override def children: Seq[SparkPlan] = Seq.empty
42+
}
43+
44+
object LocalTableScanTransformer {
45+
def isSupportLocalTableScanExec(plan: LocalTableScanExec): Boolean =
46+
BackendsApiManager.getSparkPlanExecApiInstance.isSupportLocalTableScanExec(plan)
47+
48+
def getLocalTableScanTransform(plan: LocalTableScanExec): LocalTableScanTransformer =
49+
BackendsApiManager.getSparkPlanExecApiInstance.getLocalTableScanTransform(plan)
50+
}

0 commit comments

Comments
 (0)