Skip to content

Commit 875982c

Browse files
vranescloud-fan
authored andcommitted
[SPARK-57440][SQL] Gate BIN BY behind spark.sql.binByRelationOperator.enabled
### What changes were proposed in this pull request? Follows up #56426 (BIN BY parsing and resolution). Tracked under SPARK-57440. Adds an internal session config `spark.sql.binByRelationOperator.enabled` (default `false`) that gates the `BIN BY` relation operator at analysis time. - New `BIN_BY_ENABLED` conf in `SQLConf` (config key `spark.sql.binByRelationOperator.enabled`). - `ResolveBinBy` short-circuits to `QueryCompilationErrors.binByDisabledError()` when the conf is off, before resolving the operator. - The gate reuses the existing `UNSUPPORTED_FEATURE.BIN_BY` condition (already raised by the execution stub) rather than introducing a new one. Its message is generalized to "The BIN BY relation operator is not yet supported." so both the analysis-time gate (flag off) and the execution stub (flag on) surface the same condition. ### Why are the changes needed? `BIN BY` resolves today but has no physical execution yet (#56426 stubs it). A config gate keeps the incomplete operator disabled by default and lets it be enabled for development and testing, instead of exposing a half-implemented operator. Defaulting off is honest while execution is unimplemented. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - `ResolveBinBySuite`: existing cases run with the flag enabled (via a `test()` override); a new `super.test` case asserts the operator is rejected with `UNSUPPORTED_FEATURE.BIN_BY` when the flag is off. - `BinBySuite`: the stub test runs with the flag on; a new case asserts the flag-off gate fails at analysis. - `SparkThrowableSuite` passes (error-conditions.json golden). - `catalyst/Test/compile` + `sql/Test/compile` and scalastyle on the touched modules are clean. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Claude Fable 5) Closes #56487 from vranes/bin-by-oss-config. Authored-by: Nikolina Vraneš <nikolina.vranes@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 6330518 commit 875982c

6 files changed

Lines changed: 81 additions & 20 deletions

File tree

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8094,7 +8094,7 @@
80948094
},
80958095
"BIN_BY" : {
80968096
"message" : [
8097-
"Physical execution of BIN BY is not yet implemented."
8097+
"The BIN BY relation operator is not yet supported."
80988098
]
80998099
},
81008100
"CATALOG_INTERFACE_METHOD" : {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBy.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ object ResolveBinBy extends Rule[LogicalPlan] {
3939

4040
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
4141
_.containsPattern(UNRESOLVED_BIN_BY), ruleId) {
42+
case _: UnresolvedBinBy if !SQLConf.get.getConf(SQLConf.BIN_BY_ENABLED) =>
43+
throw QueryCompilationErrors.binByDisabledError()
4244
case b: UnresolvedBinBy if !readyToResolve(b) => b
4345
case b: UnresolvedBinBy => resolve(b)
4446
}

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
313313
messageParameters = Map("columnName" -> toSQLId(columnName)))
314314
}
315315

316+
def binByDisabledError(): Throwable = {
317+
new AnalysisException(
318+
errorClass = "UNSUPPORTED_FEATURE.BIN_BY",
319+
messageParameters = Map.empty)
320+
}
321+
316322
def binByRequiresTopLevelColumnError(columnName: String): Throwable = {
317323
new AnalysisException(
318324
errorClass = "BIN_BY_REQUIRES_TOP_LEVEL_COLUMN",

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5193,6 +5193,16 @@ object SQLConf {
51935193
.booleanConf
51945194
.createWithDefault(true)
51955195

5196+
val BIN_BY_ENABLED =
5197+
buildConf("spark.sql.binByRelationOperator.enabled")
5198+
.doc("Enable the BIN BY relation operator for aligning range-typed rows to " +
5199+
"fixed-width bin boundaries.")
5200+
.internal()
5201+
.version("4.3.0")
5202+
.withBindingPolicy(ConfigBindingPolicy.SESSION)
5203+
.booleanConf
5204+
.createWithDefault(false)
5205+
51965206
object StoreAssignmentPolicy extends Enumeration {
51975207
val ANSI, LEGACY, STRICT = Value
51985208
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveBinBySuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20+
import org.scalactic.source.Position
21+
import org.scalatest.Tag
22+
2023
import org.apache.spark.SparkThrowable
2124
import org.apache.spark.sql.catalyst.QueryPlanningTracker
2225
import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -28,6 +31,17 @@ import org.apache.spark.sql.types._
2831

2932
class ResolveBinBySuite extends AnalysisTest {
3033

34+
// BIN BY is gated off by default; run the resolution tests with it enabled. The dedicated
35+
// gate test below uses `super.test` to observe the default-off behavior.
36+
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)
37+
(implicit pos: Position): Unit = {
38+
super.test(testName, testTags: _*) {
39+
withSQLConf(SQLConf.BIN_BY_ENABLED.key -> "true") {
40+
testFun
41+
}
42+
}
43+
}
44+
3145
private val tsStart = $"ts_start".timestamp
3246
private val tsEnd = $"ts_end".timestamp
3347
private val tsStartNtz = $"ts_start".timestampNTZ
@@ -304,4 +318,10 @@ class ResolveBinBySuite extends AnalysisTest {
304318
assert(appendedExprIds.distinct.size == appendedExprIds.size,
305319
"appended BinBy attributes must have distinct exprIds across the two join sides")
306320
}
321+
322+
// `super.test` escapes the suite-wide flag-on wrapper so this runs with the default (off).
323+
super.test("BIN BY is gated off by default") {
324+
assert(!SQLConf.get.getConf(SQLConf.BIN_BY_ENABLED))
325+
expectError(unresolved(), "UNSUPPORTED_FEATURE.BIN_BY")
326+
}
307327
}

sql/core/src/test/scala/org/apache/spark/sql/BinBySuite.scala

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,55 @@
1717

1818
package org.apache.spark.sql
1919

20-
import org.apache.spark.SparkUnsupportedOperationException
20+
import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
21+
import org.apache.spark.sql.internal.SQLConf
2122
import org.apache.spark.sql.test.SharedSparkSession
2223

2324
class BinBySuite extends QueryTest with SharedSparkSession {
2425

26+
private def createMetricsView(): Unit = {
27+
spark.sql(
28+
"""SELECT TIMESTAMP '2024-01-01 00:00:00' AS ts_start,
29+
| TIMESTAMP '2024-01-01 01:00:00' AS ts_end,
30+
| CAST(1 AS DOUBLE) AS value""".stripMargin).createOrReplaceTempView("metrics")
31+
}
32+
33+
private val binByQuery =
34+
"""SELECT * FROM metrics BIN BY (
35+
| RANGE ts_start TO ts_end
36+
| BIN WIDTH INTERVAL '5' MINUTE
37+
| DISTRIBUTE UNIFORM (value)
38+
|)""".stripMargin
39+
2540
test("BIN BY analyzes but physical execution is not yet implemented") {
41+
withSQLConf(SQLConf.BIN_BY_ENABLED.key -> "true") {
42+
withTempView("metrics") {
43+
createMetricsView()
44+
val df = spark.sql(binByQuery)
45+
46+
// Analysis is fully functional when the operator is enabled.
47+
df.queryExecution.assertAnalyzed()
48+
49+
// Physical execution is stubbed until the follow-up PR; planning surfaces a clean
50+
// UNSUPPORTED_FEATURE error rather than an internal error.
51+
checkError(
52+
exception = intercept[SparkUnsupportedOperationException] {
53+
df.collect()
54+
},
55+
condition = "UNSUPPORTED_FEATURE.BIN_BY",
56+
parameters = Map.empty[String, String])
57+
}
58+
}
59+
}
60+
61+
test("BIN BY is gated off by default") {
2662
withTempView("metrics") {
27-
spark.sql(
28-
"""SELECT TIMESTAMP '2024-01-01 00:00:00' AS ts_start,
29-
| TIMESTAMP '2024-01-01 01:00:00' AS ts_end,
30-
| CAST(1 AS DOUBLE) AS value""".stripMargin).createOrReplaceTempView("metrics")
31-
val df = spark.sql(
32-
"""SELECT * FROM metrics BIN BY (
33-
| RANGE ts_start TO ts_end
34-
| BIN WIDTH INTERVAL '5' MINUTE
35-
| DISTRIBUTE UNIFORM (value)
36-
|)""".stripMargin)
37-
38-
// Analysis is fully functional in this PR.
39-
df.queryExecution.assertAnalyzed()
40-
41-
// Physical execution is stubbed until the follow-up PR; planning surfaces a clean
42-
// UNSUPPORTED_FEATURE error rather than an internal error.
63+
createMetricsView()
64+
// Gated off, the operator is rejected at analysis with the same UNSUPPORTED_FEATURE.BIN_BY
65+
// condition the execution stub raises when enabled.
4366
checkError(
44-
exception = intercept[SparkUnsupportedOperationException] {
45-
df.collect()
67+
exception = intercept[SparkThrowable] {
68+
spark.sql(binByQuery).queryExecution.assertAnalyzed()
4669
},
4770
condition = "UNSUPPORTED_FEATURE.BIN_BY",
4871
parameters = Map.empty[String, String])

0 commit comments

Comments
 (0)