Skip to content

Commit c9af2c6

Browse files
authored
chore: Auto scan mode no longer falls back to native_comet (#3236)
1 parent ea264a3 commit c9af2c6

2 files changed

Lines changed: 51 additions & 36 deletions

File tree

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
167167
case SCAN_AUTO =>
168168
// TODO add support for native_datafusion in the future
169169
nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
170-
.orElse(nativeCometScan(session, scanExec, r, hadoopConf))
171170
.getOrElse(scanExec)
172171
case SCAN_NATIVE_DATAFUSION =>
173172
nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -187,53 +187,69 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
187187
}
188188

189189
test("basic data type support") {
190+
// this test requires native_comet scan due to unsigned u8/u16 issue
191+
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
192+
Seq(true, false).foreach { dictionaryEnabled =>
193+
withTempDir { dir =>
194+
val path = new Path(dir.toURI.toString, "test.parquet")
195+
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
196+
withParquetTable(path.toString, "tbl") {
197+
checkSparkAnswerAndOperator("select * FROM tbl WHERE _2 > 100")
198+
}
199+
}
200+
}
201+
}
202+
}
203+
204+
test("basic data type support - excluding u8/u16") {
205+
// variant that skips _9 (UINT_8) and _10 (UINT_16) for default scan impl
190206
Seq(true, false).foreach { dictionaryEnabled =>
191207
withTempDir { dir =>
192208
val path = new Path(dir.toURI.toString, "test.parquet")
193209
makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = dictionaryEnabled, 10000)
194-
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") {
210+
withParquetTable(path.toString, "tbl") {
211+
// select all columns except _9 (UINT_8) and _10 (UINT_16)
212+
checkSparkAnswerAndOperator(
213+
"""select _1, _2, _3, _4, _5, _6, _7, _8, _11, _12, _13, _14, _15, _16, _17,
214+
|_18, _19, _20, _21, _id FROM tbl WHERE _2 > 100""".stripMargin)
215+
}
216+
}
217+
}
218+
}
219+
220+
test("uint data type support") {
221+
// this test requires native_comet scan due to unsigned u8/u16 issue
222+
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET) {
223+
Seq(true, false).foreach { dictionaryEnabled =>
224+
withTempDir { dir =>
225+
val path = new Path(dir.toURI.toString, "testuint.parquet")
226+
makeParquetFileAllPrimitiveTypes(
227+
path,
228+
dictionaryEnabled = dictionaryEnabled,
229+
Byte.MinValue,
230+
Byte.MaxValue)
195231
withParquetTable(path.toString, "tbl") {
196-
checkSparkAnswerAndOperator("select * FROM tbl WHERE _2 > 100")
232+
val qry = "select _9 from tbl order by _11"
233+
checkSparkAnswerAndOperator(qry)
197234
}
198235
}
199236
}
200237
}
201238
}
202239

203-
test("uint data type support") {
240+
test("uint data type support - excluding u8/u16") {
241+
// variant that tests UINT_32 and UINT_64, skipping _9 (UINT_8) and _10 (UINT_16)
204242
Seq(true, false).foreach { dictionaryEnabled =>
205-
// TODO: Once the question of what to get back from uint_8, uint_16 types is resolved,
206-
// we can also update this test to check for COMET_SCAN_ALLOW_INCOMPATIBLE=true
207-
Seq(false).foreach { allowIncompatible =>
208-
{
209-
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> allowIncompatible.toString) {
210-
withTempDir { dir =>
211-
val path = new Path(dir.toURI.toString, "testuint.parquet")
212-
makeParquetFileAllPrimitiveTypes(
213-
path,
214-
dictionaryEnabled = dictionaryEnabled,
215-
Byte.MinValue,
216-
Byte.MaxValue)
217-
withParquetTable(path.toString, "tbl") {
218-
val qry = "select _9 from tbl order by _11"
219-
if (usingDataSourceExec(conf)) {
220-
if (!allowIncompatible) {
221-
checkSparkAnswerAndOperator(qry)
222-
} else {
223-
// need to convert the values to unsigned values
224-
val expected = (Byte.MinValue to Byte.MaxValue)
225-
.map(v => {
226-
if (v < 0) Byte.MaxValue.toShort - v else v
227-
})
228-
.toDF("a")
229-
checkAnswer(sql(qry), expected)
230-
}
231-
} else {
232-
checkSparkAnswerAndOperator(qry)
233-
}
234-
}
235-
}
236-
}
243+
withTempDir { dir =>
244+
val path = new Path(dir.toURI.toString, "testuint.parquet")
245+
makeParquetFileAllPrimitiveTypes(
246+
path,
247+
dictionaryEnabled = dictionaryEnabled,
248+
Byte.MinValue,
249+
Byte.MaxValue)
250+
withParquetTable(path.toString, "tbl") {
251+
// test UINT_32 (_11) and UINT_64 (_12) only
252+
checkSparkAnswerAndOperator("select _11, _12 from tbl order by _11")
237253
}
238254
}
239255
}

0 commit comments

Comments
 (0)