Skip to content

Commit 456914d

Browse files
committed
Add DataFrame tileStats extension
Initial implementation with quantiles Signed-off-by: Jason T. Brown <jason@astraea.earth>
1 parent d8ea781 commit 456914d

4 files changed

Lines changed: 204 additions & 0 deletions

File tree

core/src/main/scala/org/locationtech/rasterframes/extensions/DataFrameMethods.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ trait DataFrameMethods[DF <: DataFrame] extends MethodExtensions[DF] with Metada
155155
def withPrefixedColumnNames(prefix: String): DF =
156156
self.columns.foldLeft(self)((df, c) df.withColumnRenamed(c, s"$prefix$c").asInstanceOf[DF])
157157

158+
/** */
159+
def tileStat(): RasterFrameStatFunctions = new RasterFrameStatFunctions(self)
160+
158161
/**
159162
* Performs a jeft join on the dataframe `right` to this one, reprojecting and merging tiles as necessary.
160163
* The operation is logically a "left outer" join, with the left side also determining the target CRS and extents.
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package org.locationtech.rasterframes.extensions
2+
3+
import org.locationtech.rasterframes.stats._
4+
import org.apache.spark.sql.DataFrame
5+
import org.apache.spark.sql.functions.col
6+
7+
final class RasterFrameStatFunctions private[rasterframes](df: DataFrame) {
8+
9+
/**
10+
* Calculates the approximate quantiles of a numerical column of a DataFrame.
11+
*
12+
* The result of this algorithm has the following deterministic bound:
13+
* If the DataFrame has N elements and if we request the quantile at probability `p` up to error
14+
* `err`, then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank
15+
* of `x` is close to (p * N).
16+
* More precisely,
17+
*
18+
* {{{
19+
* floor((p - err) * N) <= rank(x) <= ceil((p + err) * N)
20+
* }}}
21+
*
22+
* This method implements a variation of the Greenwald-Khanna algorithm (with some speed
23+
* optimizations).
24+
* The algorithm was first present in <a href="http://dx.doi.org/10.1145/375663.375670">
25+
* Space-efficient Online Computation of Quantile Summaries</a> by Greenwald and Khanna.
26+
*
27+
* @param col the name of the numerical column
28+
* @param probabilities a list of quantile probabilities
29+
* Each number must belong to [0, 1].
30+
* For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
31+
* @param relativeError The relative target precision to achieve (greater than or equal to 0).
32+
* If set to zero, the exact quantiles are computed, which could be very expensive.
33+
* Note that values greater than 1 are accepted but give the same result as 1.
34+
* @return the approximate quantiles at the given probabilities
35+
*
36+
* @note null and NaN values will be removed from the numerical column before calculation. If
37+
* the dataframe is empty or the column only contains null or NaN, an empty array is returned.
38+
*
39+
* @since 2.0.0
40+
*/
41+
def approxTileQuantile(
42+
col: String,
43+
probabilities: Array[Double],
44+
relativeError: Double): Array[Double] = {
45+
approxTileQuantile(Array(col), probabilities, relativeError).head
46+
}
47+
48+
/**
49+
* Calculates the approximate quantiles of numerical columns of a DataFrame.
50+
* @see `approxQuantile(col:Str* approxQuantile)` for detailed description.
51+
*
52+
* @param cols the names of the numerical columns
53+
* @param probabilities a list of quantile probabilities
54+
* Each number must belong to [0, 1].
55+
* For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
56+
* @param relativeError The relative target precision to achieve (greater than or equal to 0).
57+
* If set to zero, the exact quantiles are computed, which could be very expensive.
58+
* Note that values greater than 1 are accepted but give the same result as 1.
59+
* @return the approximate quantiles at the given probabilities of each column
60+
*
61+
* @note null and NaN values will be ignored in numerical columns before calculation. For
62+
* columns only containing null or NaN values, an empty array is returned.
63+
*
64+
*/
65+
def approxTileQuantile(
66+
cols: Array[String],
67+
probabilities: Array[Double],
68+
relativeError: Double): Array[Array[Double]] = {
69+
multipleApproxQuantiles(
70+
df.select(cols.map(col): _*),
71+
cols,
72+
probabilities,
73+
relativeError).map(_.toArray).toArray
74+
}
75+
76+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package org.locationtech.rasterframes
2+
3+
import geotrellis.raster.Tile
4+
import org.locationtech.rasterframes.TileType
5+
import org.locationtech.rasterframes.expressions.DynamicExtractors._
6+
import org.apache.spark.sql.{Column, DataFrame, Row}
7+
import org.apache.spark.sql.catalyst.expressions.Cast
8+
import org.apache.spark.sql.catalyst.util.QuantileSummaries
9+
import org.apache.spark.sql.types.{DoubleType, NumericType}
10+
import org.locationtech.rasterframes.expressions.accessors.ExtractTile
11+
12+
13+
package object stats {
14+
15+
def multipleApproxQuantiles(df: DataFrame,
16+
cols: Seq[String],
17+
probabilities: Seq[Double],
18+
relativeError: Double): Seq[Seq[Double]] = {
19+
require(relativeError >= 0,
20+
s"Relative Error must be non-negative but got $relativeError")
21+
22+
val columns: Seq[Column] = cols.map { colName =>
23+
val field = df.schema(colName)
24+
25+
require(tileExtractor.isDefinedAt(field.dataType),
26+
s"Quantile calculation for column $colName with data type ${field.dataType}" +
27+
" is not supported; it must be Tile-like.")
28+
ExtractTile(new Column(colName))
29+
}
30+
31+
val emptySummaries = Array.fill(cols.size)(
32+
new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, relativeError))
33+
34+
def apply(summaries: Array[QuantileSummaries], row: Row): Array[QuantileSummaries] = {
35+
var i = 0
36+
while (i < summaries.length) {
37+
if (!row.isNullAt(i)) {
38+
val t: Tile = row.getAs[Tile](i)
39+
// now insert all the tile values into the summary for this column
40+
t.foreachDouble(v
41+
if (!v.isNaN) summaries(i) = summaries(i).insert(v)
42+
)
43+
}
44+
i += 1 // next column
45+
}
46+
summaries
47+
}
48+
49+
def merge(
50+
sum1: Array[QuantileSummaries],
51+
sum2: Array[QuantileSummaries]): Array[QuantileSummaries] = {
52+
sum1.zip(sum2).map { case (s1, s2) => s1.compress().merge(s2.compress()) }
53+
}
54+
val summaries = df.select(columns: _*).rdd.treeAggregate(emptySummaries)(apply, merge)
55+
56+
summaries.map { summary => probabilities.flatMap(summary.query) }
57+
}
58+
59+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright 2018 Astraea, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* [http://www.apache.org/licenses/LICENSE-2.0]
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*
18+
* SPDX-License-Identifier: Apache-2.0
19+
*
20+
*/
21+
22+
package org.locationtech.rasterframes
23+
24+
import org.apache.spark.sql.functions.col
25+
26+
import org.locationtech.rasterframes._
27+
import org.locationtech.rasterframes.RasterFunctions
28+
29+
class RasterFramesStatsSpec extends TestEnvironment with TestData {
30+
31+
describe("DataFrame.tileStats extension methods") {
32+
33+
val df = TestData.sampleGeoTiff.toDF()
34+
.withColumn("tilePlus2", rf_local_add(col("tile"), 2))
35+
36+
it("should compute approx percentiles for a single tile col"){
37+
38+
val result = df.tileStat().approxTileQuantile(
39+
"tile",
40+
Array(0.10, 0.50, 0.90),
41+
0.00001
42+
)
43+
44+
result.length should be (3)
45+
46+
// computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles
47+
result should contain inOrderOnly (7963.0, 10068.0, 12160.0)
48+
}
49+
50+
it("should compute approx percentiles for many tile cols"){
51+
val result = df.tileStat().approxTileQuantile(
52+
Array("tile", "tilePlus2"),
53+
Array(0.25, 0.75),
54+
0.00001
55+
)
56+
result.length should be (2)
57+
// nested inside is another array of length 2 for each p
58+
result.foreach{c c.length should be (2)}
59+
60+
result.head should contain inOrderOnly (8701, 11261)
61+
result.tail.head should contain inOrderOnly (8703, 11263)
62+
}
63+
64+
}
65+
66+
}

0 commit comments

Comments
 (0)