Skip to content

Commit 52ab85c

Browse files
andygroveclaude
andcommitted
fix: restore deleted tests and fix inaccurate documentation
- Restore original ignored tests for ROWS BETWEEN with PARTITION BY + ORDER BY (COUNT, SUM, AVG with various ROWS BETWEEN frames) - Fix documentation to accurately reflect what's supported: - Remove AVG from supported list (has native implementation issues) - Clarify PARTITION BY/ORDER BY restriction (partition must be subset of order) - Clarify ROWS BETWEEN limitations - Fix misleading test names ("COUNT with ROWS frame" -> "COUNT with PARTITION BY only") Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent d50e34e commit 52ab85c

2 files changed

Lines changed: 104 additions & 11 deletions

File tree

docs/source/user-guide/latest/compatibility.md

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,23 @@ this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.
6666

6767
## Window Functions
6868

69-
Comet supports window aggregate functions with ROWS BETWEEN frames. These are enabled by default when
70-
`spark.comet.exec.window.enabled=true` (the default). Ranking functions (ROW_NUMBER, RANK, etc.)
71-
and offset functions (LAG, LEAD) are not yet supported and will automatically fall back to Spark.
69+
Comet supports a subset of window aggregate functions. These are enabled by default when
70+
`spark.comet.exec.window.enabled=true` (the default). Unsupported window functions will automatically
71+
fall back to Spark.
7272

7373
**Supported:**
7474

75-
- Window aggregates: `COUNT`, `SUM`, `AVG`, `MIN`, `MAX`
76-
- Frame types: `ROWS BETWEEN` with `UNBOUNDED PRECEDING`, `CURRENT ROW`, `UNBOUNDED FOLLOWING`, and numeric offsets
77-
- `PARTITION BY` and `ORDER BY` clauses (can be different columns)
75+
- Window aggregates: `COUNT`, `SUM`, `MIN`, `MAX`
76+
- `PARTITION BY` only (no `ORDER BY`)
77+
- `ORDER BY` only (no `PARTITION BY`)
78+
- `PARTITION BY` with `ORDER BY` when partition columns are a subset of order columns
79+
(e.g., `PARTITION BY a ORDER BY a, b` works, but `PARTITION BY a ORDER BY b` does not)
7880

79-
**Not Supported:**
81+
**Not Yet Supported:**
8082

83+
- `AVG` window aggregate (native implementation has known issues)
84+
- `PARTITION BY` with `ORDER BY` using different columns (falls back to Spark)
85+
- `ROWS BETWEEN` frames with `PARTITION BY` and `ORDER BY` on different columns
8186
- Ranking functions: `ROW_NUMBER`, `RANK`, `DENSE_RANK`, `PERCENT_RANK`, `NTILE`, `CUME_DIST`
8287
- Offset functions: `LAG`, `LEAD`
8388
- Value functions: `FIRST_VALUE`, `LAST_VALUE`, `NTH_VALUE`

spark/src/test/scala/org/apache/comet/exec/CometWindowExecSuite.scala

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -365,8 +365,8 @@ class CometWindowExecSuite extends CometTestBase {
365365
}
366366
}
367367

368-
// COUNT with ROWS frame (ORDER BY with PARTITION BY not yet fully supported in native)
369-
test("window: COUNT with ROWS frame") {
368+
// COUNT with PARTITION BY only (no ORDER BY)
369+
test("window: COUNT with PARTITION BY only") {
370370
withTempDir { dir =>
371371
(0 until 30)
372372
.map(i => (i % 3, i % 5, i))
@@ -386,7 +386,7 @@ class CometWindowExecSuite extends CometTestBase {
386386
}
387387
}
388388

389-
// SUM with PARTITION BY only (ORDER BY with PARTITION BY not yet fully supported in native)
389+
// SUM with PARTITION BY only (no ORDER BY)
390390
test("window: SUM with PARTITION BY only") {
391391
withTempDir { dir =>
392392
(0 until 30)
@@ -407,6 +407,94 @@ class CometWindowExecSuite extends CometTestBase {
407407
}
408408
}
409409

410+
// TODO: COUNT with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW produces incorrect results
411+
// Falls back to Spark - partition expressions must be subset of order expressions
412+
ignore("window: COUNT with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW") {
413+
withTempDir { dir =>
414+
(0 until 30)
415+
.map(i => (i % 3, i % 5, i))
416+
.toDF("a", "b", "c")
417+
.repartition(3)
418+
.write
419+
.mode("overwrite")
420+
.parquet(dir.toString)
421+
422+
spark.read.parquet(dir.toString).createOrReplaceTempView("window_test")
423+
val df = sql("""
424+
SELECT a, b, c,
425+
COUNT(*) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as cnt
426+
FROM window_test
427+
""")
428+
checkSparkAnswerAndOperator(df)
429+
}
430+
}
431+
432+
// TODO: SUM with ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING produces incorrect results
433+
// Falls back to Spark - partition expressions must be subset of order expressions
434+
ignore("window: SUM with ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING") {
435+
withTempDir { dir =>
436+
(0 until 30)
437+
.map(i => (i % 3, i % 5, i))
438+
.toDF("a", "b", "c")
439+
.repartition(3)
440+
.write
441+
.mode("overwrite")
442+
.parquet(dir.toString)
443+
444+
spark.read.parquet(dir.toString).createOrReplaceTempView("window_test")
445+
val df = sql("""
446+
SELECT a, b, c,
447+
SUM(c) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as sum_c
448+
FROM window_test
449+
""")
450+
checkSparkAnswerAndOperator(df)
451+
}
452+
}
453+
454+
// TODO: AVG with ROWS BETWEEN produces incorrect results
455+
// Falls back to Spark - partition expressions must be subset of order expressions
456+
ignore("window: AVG with ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING") {
457+
withTempDir { dir =>
458+
(0 until 30)
459+
.map(i => (i % 3, i % 5, i))
460+
.toDF("a", "b", "c")
461+
.repartition(3)
462+
.write
463+
.mode("overwrite")
464+
.parquet(dir.toString)
465+
466+
spark.read.parquet(dir.toString).createOrReplaceTempView("window_test")
467+
val df = sql("""
468+
SELECT a, b, c,
469+
AVG(c) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as avg_c
470+
FROM window_test
471+
""")
472+
checkSparkAnswerAndOperator(df)
473+
}
474+
}
475+
476+
// TODO: SUM with ROWS BETWEEN produces incorrect results
477+
// Falls back to Spark - partition expressions must be subset of order expressions
478+
ignore("window: SUM with ROWS BETWEEN 2 PRECEDING AND CURRENT ROW") {
479+
withTempDir { dir =>
480+
(0 until 30)
481+
.map(i => (i % 3, i % 5, i))
482+
.toDF("a", "b", "c")
483+
.repartition(3)
484+
.write
485+
.mode("overwrite")
486+
.parquet(dir.toString)
487+
488+
spark.read.parquet(dir.toString).createOrReplaceTempView("window_test")
489+
val df = sql("""
490+
SELECT a, b, c,
491+
SUM(c) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as sum_c
492+
FROM window_test
493+
""")
494+
checkSparkAnswerAndOperator(df)
495+
}
496+
}
497+
410498
// TODO: COUNT with ROWS BETWEEN not supported
411499
// Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same"
412500
ignore("window: COUNT with ROWS BETWEEN CURRENT ROW AND 2 FOLLOWING") {
@@ -1003,7 +1091,7 @@ class CometWindowExecSuite extends CometTestBase {
10031091
}
10041092
}
10051093

1006-
// Multiple aggregate functions in single query (TODO: fix AVG support in native window)
1094+
// Multiple aggregate functions in single query (COUNT, SUM, MIN, MAX with PARTITION BY only)
10071095
test("window: multiple aggregate functions in single query") {
10081096
withTempDir { dir =>
10091097
(0 until 30)

0 commit comments

Comments
 (0)