@@ -365,8 +365,8 @@ class CometWindowExecSuite extends CometTestBase {
365365 }
366366 }
367367
368- // COUNT with ROWS frame (ORDER BY with PARTITION BY not yet fully supported in native )
369- test(" window: COUNT with ROWS frame " ) {
368+ // COUNT with PARTITION BY only (no ORDER BY )
369+ test(" window: COUNT with PARTITION BY only " ) {
370370 withTempDir { dir =>
371371 (0 until 30 )
372372 .map(i => (i % 3 , i % 5 , i))
@@ -386,7 +386,7 @@ class CometWindowExecSuite extends CometTestBase {
386386 }
387387 }
388388
389- // SUM with PARTITION BY only (ORDER BY with PARTITION BY not yet fully supported in native )
389+ // SUM with PARTITION BY only (no ORDER BY)
390390 test(" window: SUM with PARTITION BY only" ) {
391391 withTempDir { dir =>
392392 (0 until 30 )
@@ -407,9 +407,9 @@ class CometWindowExecSuite extends CometTestBase {
407407 }
408408 }
409409
410- // TODO: COUNT with ROWS BETWEEN not supported
411- // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same"
412- ignore (" window: COUNT with ROWS BETWEEN CURRENT ROW AND 2 FOLLOWING " ) {
410+ // ROWS BETWEEN with PARTITION BY a ORDER BY b falls back to Spark
411+ // (partition expressions must be subset of order expressions)
412+ test (" window: COUNT with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW falls back " ) {
413413 withTempDir { dir =>
414414 (0 until 30 )
415415 .map(i => (i % 3 , i % 5 , i))
@@ -420,18 +420,106 @@ class CometWindowExecSuite extends CometTestBase {
420420 .parquet(dir.toString)
421421
422422 spark.read.parquet(dir.toString).createOrReplaceTempView(" window_test" )
423- val df = sql("""
423+ checkSparkAnswerAndFallbackReason(
424+ """
425+ SELECT a, b, c,
426+ COUNT(*) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as cnt
427+ FROM window_test
428+ """ ,
429+ " Partition expressions must be a subset of order expressions for native window" )
430+ }
431+ }
432+
433+ // ROWS BETWEEN with PARTITION BY a ORDER BY b falls back to Spark
434+ test(" window: SUM with ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING falls back" ) {
435+ withTempDir { dir =>
436+ (0 until 30 )
437+ .map(i => (i % 3 , i % 5 , i))
438+ .toDF(" a" , " b" , " c" )
439+ .repartition(3 )
440+ .write
441+ .mode(" overwrite" )
442+ .parquet(dir.toString)
443+
444+ spark.read.parquet(dir.toString).createOrReplaceTempView(" window_test" )
445+ checkSparkAnswerAndFallbackReason(
446+ """
447+ SELECT a, b, c,
448+ SUM(c) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) as sum_c
449+ FROM window_test
450+ """ ,
451+ " Partition expressions must be a subset of order expressions for native window" )
452+ }
453+ }
454+
455+ // AVG with ROWS BETWEEN falls back to Spark
456+ test(" window: AVG with ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING falls back" ) {
457+ withTempDir { dir =>
458+ (0 until 30 )
459+ .map(i => (i % 3 , i % 5 , i))
460+ .toDF(" a" , " b" , " c" )
461+ .repartition(3 )
462+ .write
463+ .mode(" overwrite" )
464+ .parquet(dir.toString)
465+
466+ spark.read.parquet(dir.toString).createOrReplaceTempView(" window_test" )
467+ checkSparkAnswerAndFallbackReason(
468+ """
469+ SELECT a, b, c,
470+ AVG(c) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as avg_c
471+ FROM window_test
472+ """ ,
473+ " Partition expressions must be a subset of order expressions for native window" )
474+ }
475+ }
476+
477+ // SUM with ROWS BETWEEN falls back to Spark
478+ test(" window: SUM with ROWS BETWEEN 2 PRECEDING AND CURRENT ROW falls back" ) {
479+ withTempDir { dir =>
480+ (0 until 30 )
481+ .map(i => (i % 3 , i % 5 , i))
482+ .toDF(" a" , " b" , " c" )
483+ .repartition(3 )
484+ .write
485+ .mode(" overwrite" )
486+ .parquet(dir.toString)
487+
488+ spark.read.parquet(dir.toString).createOrReplaceTempView(" window_test" )
489+ checkSparkAnswerAndFallbackReason(
490+ """
491+ SELECT a, b, c,
492+ SUM(c) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as sum_c
493+ FROM window_test
494+ """ ,
495+ " Partition expressions must be a subset of order expressions for native window" )
496+ }
497+ }
498+
499+ // COUNT with ROWS BETWEEN falls back to Spark
500+ test(" window: COUNT with ROWS BETWEEN CURRENT ROW AND 2 FOLLOWING falls back" ) {
501+ withTempDir { dir =>
502+ (0 until 30 )
503+ .map(i => (i % 3 , i % 5 , i))
504+ .toDF(" a" , " b" , " c" )
505+ .repartition(3 )
506+ .write
507+ .mode(" overwrite" )
508+ .parquet(dir.toString)
509+
510+ spark.read.parquet(dir.toString).createOrReplaceTempView(" window_test" )
511+ checkSparkAnswerAndFallbackReason(
512+ """
424513 SELECT a, b, c,
425514 COUNT(*) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN CURRENT ROW AND 2 FOLLOWING) as cnt
426515 FROM window_test
427- """ )
428- checkSparkAnswerAndOperator(df )
516+ """ ,
517+ " Partition expressions must be a subset of order expressions for native window " )
429518 }
430519 }
431520
432- // TODO: MAX with ROWS BETWEEN UNBOUNDED not supported
433- // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same"
434- ignore(" window: MAX with ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING" ) {
521+ // MAX with ROWS BETWEEN UNBOUNDED falls back to Spark
522+ test(" window: MAX with ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING falls back" ) {
435523 withTempDir { dir =>
436524 (0 until 30 )
437525 .map(i => (i % 3 , i % 5 , i))
@@ -442,12 +530,13 @@ class CometWindowExecSuite extends CometTestBase {
442530 .parquet(dir.toString)
443531
444532 spark.read.parquet(dir.toString).createOrReplaceTempView(" window_test" )
445- val df = sql("""
533+ checkSparkAnswerAndFallbackReason(
534+ """
446535 SELECT a, b, c,
447536 MAX(c) OVER (PARTITION BY a ORDER BY b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as max_c
448537 FROM window_test
449- """ )
450- checkSparkAnswerAndOperator(df )
538+ """ ,
539+ " Partition expressions must be a subset of order expressions for native window " )
451540 }
452541 }
453542
@@ -778,9 +867,8 @@ class CometWindowExecSuite extends CometTestBase {
778867 }
779868 }
780869
781- // TODO: ORDER BY DESC with aggregation not supported
782- // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same"
783- ignore(" window: ORDER BY DESC with aggregation" ) {
870+ // ORDER BY DESC with PARTITION BY falls back to Spark
871+ test(" window: ORDER BY DESC with aggregation falls back" ) {
784872 withTempDir { dir =>
785873 (0 until 30 )
786874 .map(i => (i % 3 , i % 5 , i))
@@ -791,18 +879,18 @@ class CometWindowExecSuite extends CometTestBase {
791879 .parquet(dir.toString)
792880
793881 spark.read.parquet(dir.toString).createOrReplaceTempView(" window_test" )
794- val df = sql("""
882+ checkSparkAnswerAndFallbackReason(
883+ """
795884 SELECT a, b, c,
796885 SUM(c) OVER (PARTITION BY a ORDER BY b DESC) as sum_c_desc
797886 FROM window_test
798- """ )
799- checkSparkAnswerAndOperator(df )
887+ """ ,
888+ " Partition expressions must be a subset of order expressions for native window " )
800889 }
801890 }
802891
803- // TODO: Multiple PARTITION BY columns not supported
804- // Falls back to Spark Window operator
805- ignore(" window: multiple PARTITION BY columns" ) {
892+ // Multiple PARTITION BY columns with different ORDER BY falls back to Spark
893+ test(" window: multiple PARTITION BY columns falls back" ) {
806894 withTempDir { dir =>
807895 (0 until 30 )
808896 .map(i => (i % 3 , i % 5 , i % 2 , i))
@@ -813,18 +901,18 @@ class CometWindowExecSuite extends CometTestBase {
813901 .parquet(dir.toString)
814902
815903 spark.read.parquet(dir.toString).createOrReplaceTempView(" window_test" )
816- val df = sql("""
904+ checkSparkAnswerAndFallbackReason(
905+ """
817906 SELECT a, b, d, c,
818907 SUM(c) OVER (PARTITION BY a, b ORDER BY d) as sum_c
819908 FROM window_test
820- """ )
821- checkSparkAnswerAndOperator(df )
909+ """ ,
910+ " Partition expressions must be a subset of order expressions for native window " )
822911 }
823912 }
824913
825- // TODO: Multiple ORDER BY columns not supported
826- // Falls back to Spark Window operator
827- ignore(" window: multiple ORDER BY columns" ) {
914+ // ROW_NUMBER with multiple ORDER BY columns falls back to Spark
915+ test(" window: multiple ORDER BY columns falls back" ) {
828916 withTempDir { dir =>
829917 (0 until 30 )
830918 .map(i => (i % 3 , i % 5 , i % 2 , i))
@@ -835,18 +923,18 @@ class CometWindowExecSuite extends CometTestBase {
835923 .parquet(dir.toString)
836924
837925 spark.read.parquet(dir.toString).createOrReplaceTempView(" window_test" )
838- val df = sql("""
926+ checkSparkAnswerAndFallbackReason(
927+ """
839928 SELECT a, b, d, c,
840929 ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, d, c) as row_num
841930 FROM window_test
842- """ )
843- checkSparkAnswerAndOperator(df )
931+ """ ,
932+ " Partition expressions must be a subset of order expressions for native window " )
844933 }
845934 }
846935
847- // TODO: RANGE BETWEEN with numeric ORDER BY not supported
848- // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same"
849- ignore(" window: RANGE BETWEEN with numeric ORDER BY" ) {
936+ // RANGE BETWEEN with numeric ORDER BY falls back to Spark
937+ test(" window: RANGE BETWEEN with numeric ORDER BY falls back" ) {
850938 withTempDir { dir =>
851939 (0 until 30 )
852940 .map(i => (i % 3 , i, i * 2 ))
@@ -857,18 +945,18 @@ class CometWindowExecSuite extends CometTestBase {
857945 .parquet(dir.toString)
858946
859947 spark.read.parquet(dir.toString).createOrReplaceTempView(" window_test" )
860- val df = sql("""
948+ checkSparkAnswerAndFallbackReason(
949+ """
861950 SELECT a, b, c,
862951 SUM(c) OVER (PARTITION BY a ORDER BY b RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING) as sum_c
863952 FROM window_test
864- """ )
865- checkSparkAnswerAndOperator(df )
953+ """ ,
954+ " Partition expressions must be a subset of order expressions for native window " )
866955 }
867956 }
868957
869- // TODO: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW not supported
870- // Falls back to Spark Window operator - "Partitioning and sorting specifications must be the same"
871- ignore(" window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" ) {
958+ // RANGE BETWEEN UNBOUNDED falls back to Spark
959+ test(" window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW falls back" ) {
872960 withTempDir { dir =>
873961 (0 until 30 )
874962 .map(i => (i % 3 , i, i * 2 ))
@@ -879,12 +967,13 @@ class CometWindowExecSuite extends CometTestBase {
879967 .parquet(dir.toString)
880968
881969 spark.read.parquet(dir.toString).createOrReplaceTempView(" window_test" )
882- val df = sql("""
970+ checkSparkAnswerAndFallbackReason(
971+ """
883972 SELECT a, b, c,
884973 SUM(c) OVER (PARTITION BY a ORDER BY b RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as sum_c
885974 FROM window_test
886- """ )
887- checkSparkAnswerAndOperator(df )
975+ """ ,
976+ " Partition expressions must be a subset of order expressions for native window " )
888977 }
889978 }
890979
@@ -1003,7 +1092,7 @@ class CometWindowExecSuite extends CometTestBase {
10031092 }
10041093 }
10051094
1006- // Multiple aggregate functions in single query (TODO: fix AVG support in native window )
1095+ // Multiple aggregate functions in single query (COUNT, SUM, MIN, MAX with PARTITION BY only )
10071096 test(" window: multiple aggregate functions in single query" ) {
10081097 withTempDir { dir =>
10091098 (0 until 30 )
0 commit comments