Skip to content

Commit 9718dcd

Browse files
mariotaddeuccievertlammerts
authored andcommitted
Add nth_value window function with tests
1 parent 2f254fc commit 9718dcd

File tree

2 files changed

+91
-0
lines changed

2 files changed

+91
-0
lines changed

duckdb/experimental/spark/sql/functions.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6571,3 +6571,79 @@ def lead(col: "ColumnOrName", offset: int = 1, default: Optional[Any] = None) ->
65716571
+---+---+----------+
65726572
""" # noqa: D205, D212
65736573
return _invoke_function("lead", _to_column_expr(col), ConstantExpression(offset), ConstantExpression(default))
6574+
6575+
6576+
def nth_value(col: "ColumnOrName", offset: int, ignoreNulls: Optional[bool] = False) -> Column:
6577+
"""Window function: returns the value that is the `offset`\\th row of the window frame
6578+
(counting from 1), and `null` if the size of window frame is less than `offset` rows.
6579+
6580+
It will return the `offset`\\th non-null value it sees when `ignoreNulls` is set to
6581+
true. If all values are null, then null is returned.
6582+
6583+
This is equivalent to the nth_value function in SQL.
6584+
6585+
.. versionadded:: 3.1.0
6586+
6587+
.. versionchanged:: 3.4.0
6588+
Supports Spark Connect.
6589+
6590+
Parameters
6591+
----------
6592+
col : :class:`~pyspark.sql.Column` or column name
6593+
name of column or expression
6594+
offset : int
6595+
number of row to use as the value
6596+
ignoreNulls : bool, optional
6597+
indicates the Nth value should skip null in the
6598+
determination of which row to use
6599+
6600+
Returns:
6601+
-------
6602+
:class:`~pyspark.sql.Column`
6603+
value of nth row.
6604+
6605+
Examples:
6606+
--------
6607+
>>> from pyspark.sql import functions as sf
6608+
>>> from pyspark.sql import Window
6609+
>>> df = spark.createDataFrame(
6610+
... [("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"]
6611+
... )
6612+
>>> df.show()
6613+
+---+---+
6614+
| c1| c2|
6615+
+---+---+
6616+
| a| 1|
6617+
| a| 2|
6618+
| a| 3|
6619+
| b| 8|
6620+
| b| 2|
6621+
+---+---+
6622+
6623+
>>> w = Window.partitionBy("c1").orderBy("c2")
6624+
>>> df.withColumn("nth_value", sf.nth_value("c2", 1).over(w)).show()
6625+
+---+---+---------+
6626+
| c1| c2|nth_value|
6627+
+---+---+---------+
6628+
| a| 1| 1|
6629+
| a| 2| 1|
6630+
| a| 3| 1|
6631+
| b| 2| 2|
6632+
| b| 8| 2|
6633+
+---+---+---------+
6634+
6635+
>>> df.withColumn("nth_value", sf.nth_value("c2", 2).over(w)).show()
6636+
+---+---+---------+
6637+
| c1| c2|nth_value|
6638+
+---+---+---------+
6639+
| a| 1| NULL|
6640+
| a| 2| 2|
6641+
| a| 3| 2|
6642+
| b| 2| NULL|
6643+
| b| 8| 8|
6644+
+---+---+---------+
6645+
""" # noqa: D205, D301
6646+
if ignoreNulls:
6647+
msg = "The ignoreNulls option of nth_value is not supported yet."
6648+
raise ContributionsAcceptedError(msg)
6649+
return _invoke_function("nth_value", _to_column_expr(col), ConstantExpression(offset))

tests/fast/spark/test_spark_functions_window.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,3 +162,18 @@ def test_lead(self, spark):
162162
Row(c1="b", c2=2, next_value=8, next_value_default=8, next_value_offset2=-1),
163163
Row(c1="b", c2=8, next_value=None, next_value_default=0, next_value_offset2=-1),
164164
]
165+
166+
def test_nth_value(self, spark):
167+
df = spark.createDataFrame(data=[("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], schema=["c1", "c2"])
168+
w = Window.partitionBy("c1").orderBy("c2")
169+
df = df.withColumn("nth1", F.nth_value("c2", 1).over(w))
170+
df = df.withColumn("nth2", F.nth_value("c2", 2).over(w))
171+
res = df.sort("c1", "c2").collect()
172+
173+
assert res == [
174+
Row(c1="a", c2=1, nth1=1, nth2=None),
175+
Row(c1="a", c2=2, nth1=1, nth2=2),
176+
Row(c1="a", c2=3, nth1=1, nth2=2),
177+
Row(c1="b", c2=2, nth1=2, nth2=None),
178+
Row(c1="b", c2=8, nth1=2, nth2=8),
179+
]

0 commit comments

Comments
 (0)