Skip to content

feat: support NullType in row-to-Arrow conversion and shuffle#4460

Merged
mbutrovich merged 5 commits into
apache:mainfrom
mbutrovich:fix_4457
May 29, 2026
Merged

feat: support NullType in row-to-Arrow conversion and shuffle#4460
mbutrovich merged 5 commits into
apache:mainfrom
mbutrovich:fix_4457

Conversation

@mbutrovich
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #4457.

Rationale for this change

NullType columns currently break Comet at the row-to-Arrow boundary. Utils.toArrowType throws UnsupportedOperationException for NullType, which surfaces in two places:

  1. CometLocalTableScanExec when a LocalTableScan contains a NullType column (the case in Queries with NullType aggregate fails when native LocalTableScanExec is enabled #4457, e.g. SELECT max(col) FROM VALUES (NULL), (NULL) AS t(col)).
  2. CometShuffleExchangeExec when a Spark LocalTableScanExec with a NullType column feeds a Comet shuffle.

NullType is well-defined in Arrow (ArrowType.Null) and the Spark ArrowWriter already has a NullWriter case, so the right fix is to support it end-to-end rather than fall back. This PR is an alternative to #4458, which adds a LocalTableScanExec-only fallback and leaves the shuffle path broken.

What changes are included in this PR?

  • Utils.toArrowType maps NullType to ArrowType.Null.
  • CometShuffleExchangeExec.supportedSerializableDataType accepts NullType for both native and columnar shuffle.
  • Native shuffle row reader (native/shuffle/src/spark_unsafe/row.rs) handles NullType.

How are these changes tested?

New regression tests:

  • CometExecSuite "CometLocalTableScanExec handles NullType nested in struct/array/map" covers NullType nested under StructType, ArrayType, and MapType through CometLocalTableScanExec.
  • CometColumnarShuffleSuite "columnar shuffle with NullType passthrough column" covers JVM-input columnar shuffle with a NullType column. Replaces the older "Fallback to Spark for unsupported input besides ordering" test, which asserted the previous fallback behavior.
  • CometNativeShuffleSuite "native shuffle with NullType passthrough column" covers native shuffle with a Comet LocalTableScan source containing a NullType column. Gated on spark.comet.exec.localTableScan.enabled=true because native shuffle requires Comet input.

@mbutrovich mbutrovich requested a review from andygrove May 28, 2026 00:52
@mbutrovich mbutrovich added this to the 0.17.0 milestone May 28, 2026
Copy link
Copy Markdown
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to create a sql test file for LocalTableScanExec ?


test("CometLocalTableScanExec handles NullType column") {
withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
val df = spark.sql("SELECT * FROM VALUES ('a', null), ('b', null) AS t(x, y)")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making sure whether constant folding optimization is not affecting anything here....?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I manually inspected the plans, but the native shuffle suite is a more exhaustive check since it sees that we shuffled the data.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @kazuyukitanimura!

@mbutrovich mbutrovich merged commit 053080b into apache:main May 29, 2026
62 checks passed
@mbutrovich mbutrovich deleted the fix_4457 branch May 29, 2026 00:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Queries with NullType aggregate fails when native LocalTableScanExec is enabled

2 participants