Skip to content

Commit 2c244aa

Browse files
committed
Add documentation, python bindings for rf_agg_approx_quantiles
Attempt to register with SQL Signed-off-by: Jason T. Brown <jason@astraea.earth>
1 parent f466e38 commit 2c244aa

7 files changed

Lines changed: 69 additions & 1 deletion

File tree

core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ package org.locationtech.rasterframes.expressions.aggregates
2323

2424
import geotrellis.raster.{Tile, isNoData}
2525
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
26+
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, AggregateMode, Complete}
27+
import org.apache.spark.sql.catalyst.expressions.{ExprId, Expression, ExpressionDescription, NamedExpression}
2628
import org.apache.spark.sql.catalyst.util.QuantileSummaries
29+
import org.apache.spark.sql.execution.aggregate.ScalaUDAF
2730
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
2831
import org.apache.spark.sql.{Column, Encoder, Row, TypedColumn, types}
2932
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
@@ -85,4 +88,29 @@ object ApproxCellQuantilesAggregate {
8588
.as(s"rf_agg_approx_quantiles")
8689
.as[Seq[Double]]
8790
}
91+
92+
/** Adapter hack to allow UserDefinedAggregateFunction to be referenced as an expression. */
93+
@ExpressionDescription(
94+
usage = "_FUNC_(tile, probabilities, relativeError) - Compute aggregate cell histogram over a tile column.",
95+
arguments = """
96+
Arguments:
97+
* tile - tile column to analyze
98+
* probabilities - array of double values in [0, 1] at which to compute quantiles
99+
* relativeError - non-negative error tolerance""",
100+
examples = """
101+
Examples:
102+
> SELECT _FUNC_(tile, array(0.1, 0.25, 0.5, 0.75, 0.9), 0.001);
103+
..."""
104+
)
105+
class ApproxCellQuantilesUDAF(aggregateFunction: AggregateFunction, mode: AggregateMode, isDistinct: Boolean, resultId: ExprId)
106+
extends AggregateExpression(aggregateFunction, mode, isDistinct, resultId) {
107+
def this(child: Expression, probabilities: Seq[Double], relativeError: Double) =
108+
this(ScalaUDAF(Seq(ExtractTile(child)), new ApproxCellQuantilesAggregate(probabilities, relativeError)), Complete, false, NamedExpression.newExprId)
109+
override def nodeName: String = "rf_agg_approx_quantiles"
110+
}
111+
112+
object ApproxCellQuantilesUDAF {
113+
def apply(child: Expression, probabilities: Seq[Double], relativeError: Double): ApproxCellQuantilesUDAF = new ApproxCellQuantilesUDAF(child, probabilities, relativeError)
114+
def apply(child: Expression, probabilities: Seq[Double]): ApproxCellQuantilesUDAF = new ApproxCellQuantilesUDAF(child, probabilities, 0.00001)
115+
}
88116
}

core/src/main/scala/org/locationtech/rasterframes/expressions/package.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ package object expressions {
117117
registry.registerExpression[CellCountAggregate.NoDataCells]("rf_agg_no_data_cells")
118118
registry.registerExpression[CellStatsAggregate.CellStatsAggregateUDAF]("rf_agg_stats")
119119
registry.registerExpression[HistogramAggregate.HistogramAggregateUDAF]("rf_agg_approx_histogram")
120+
registry.registerExpression[ApproxCellQuantilesAggregate.ApproxCellQuantilesUDAF]("rf_agg_approx_quantiles")
120121
registry.registerExpression[LocalStatsAggregate.LocalStatsAggregateUDAF]("rf_agg_local_stats")
121122
registry.registerExpression[LocalTileOpAggregate.LocalMinUDAF]("rf_agg_local_min")
122123
registry.registerExpression[LocalTileOpAggregate.LocalMaxUDAF]("rf_agg_local_max")

core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,17 @@ class RasterFramesStatsSpec extends TestEnvironment with TestData {
7171
// computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles
7272
result should contain inOrderOnly(7963.0, 10068.0, 12160.0)
7373
}
74+
75+
it("should compute approx percentiles with SQL") {
76+
val result = df.selectExpr("rf_agg_approx_quantiles(tile, array(0.1, 0.5, 0.9), 0.00001) as iles")
77+
.first()
78+
.getSeq[Double](0)
79+
80+
result.length should be (3)
81+
82+
// computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles
83+
result should contain inOrderOnly(7963.0, 10068.0, 12160.0)
84+
}
7485
}
7586
}
7687

docs/src/main/paradox/reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,11 @@ Aggregates over the `tile` and returns statistical summaries of cell values: num
628628

629629
Aggregates over all of the rows in DataFrame of `tile` and returns a count of each cell value to create a histogram with values are plotted on the x-axis and counts on the y-axis. Related is the @ref:[`rf_tile_histogram`](reference.md#rf-tile-histogram) function which operates on a single row at a time.
630630

631+
### rf_agg_approx_quantiles
632+
633+
Array[Double] rf_agg_approx_quantiles(Tile tile, List[float] probabilities, float relative_error)
634+
635+
Calculates the approximate quantiles of a tile column of a DataFrame. `probabilities` is a list of float values at which to compute the quantiles. These must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum. Returns an array of values approximately at the specified `probabilities`.
631636

632637
## Tile Local Aggregate Statistics
633638

docs/src/main/paradox/release-notes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* Added `withSpatialIndex` to RasterSourceDataSource to pre-partition tiles based on tile extents mapped to a Z2 space-filling curve
1010
* Add `rf_mask_by_bit`, `rf_mask_by_bits` and `rf_local_extract_bits` to deal with bit packed quality masks. Updated the masking documentation to demonstrate the use of these functions.
1111
* Throw an `IllegalArgumentException` when attempting to apply a mask to a `Tile` whose `CellType` has no NoData defined. ([#409](https://github.com/locationtech/rasterframes/issues/384))
12+
* Add `rf_agg_approx-quantiles` function to compute cell quantiles across an entire column.
1213

1314
### 0.8.4
1415

pyrasterframes/src/main/python/pyrasterframes/rasterfunctions.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,22 @@ def rf_agg_approx_histogram(tile_col):
313313
return _apply_column_function('rf_agg_approx_histogram', tile_col)
314314

315315

316+
def rf_agg_approx_quantiles(tile_col, probabilities, relative_error=0.00001):
317+
"""
318+
Calculates the approximate quantiles of a tile column of a DataFrame.
319+
320+
:param tile_col: column to extract cells from.
321+
:param probabilities: a list of quantile probabilities. Each number must belong to [0, 1].
322+
For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
323+
:param relative_error: The relative target precision to achieve (greater than or equal to 0). Default is 0.00001
324+
:return: An array of values approximately at the specified `probabilities`
325+
"""
326+
327+
_jfn = RFContext.active().lookup('rf_agg_approx_quantiles')
328+
_tile_col = _to_java_column(tile_col)
329+
return Column(_jfn(_tile_col, probabilities, relative_error))
330+
331+
316332
def rf_agg_stats(tile_col):
317333
"""Compute the full column aggregate floating point statistics"""
318334
return _apply_column_function('rf_agg_stats', tile_col)

pyrasterframes/src/main/python/tests/RasterFunctionsTests.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from pyspark.sql.functions import *
2626

2727
import numpy as np
28-
from numpy.testing import assert_equal
28+
from numpy.testing import assert_equal, assert_allclose
2929

3030
from unittest import skip
3131
from . import TestEnvironment
@@ -133,6 +133,12 @@ def test_aggregations(self):
133133
self.assertEqual(row['rf_agg_no_data_cells(tile)'], 1000)
134134
self.assertEqual(row['rf_agg_stats(tile)'].data_cells, row['rf_agg_data_cells(tile)'])
135135

136+
def test_agg_approx_quantiles(self):
137+
agg = self.rf.agg(rf_agg_approx_quantiles('tile', [0.1, 0.5, 0.9, 0.98]))
138+
result = agg.first()[0]
139+
# expected result from computing in external python process
140+
assert_allclose(result, np.array([7412., 7638., 7671., 7675.]))
141+
136142
def test_sql(self):
137143

138144
self.rf.createOrReplaceTempView("rf_test_sql")

0 commit comments

Comments
 (0)