diff --git a/docs/api/stats/sql.md b/docs/api/stats/sql.md index f39722d87f9..005e647a9fa 100644 --- a/docs/api/stats/sql.md +++ b/docs/api/stats/sql.md @@ -135,3 +135,117 @@ names in parentheses are python variable names - useSpheroid (use_spheroid) - whether to use a cartesian or spheroidal distance calculation. Default is false In both cases the output is the input DataFrame with the weights column added to each row. + +## Moran I + +Moran I is the spatial autocorrelation algorithm, which is using spatial +location and non-spatial attribute. When the value is close to the 1 it +means that there is spatial correlation, when it is close to 0 then the +correlation does not exist and data is randomly distributed. When the +MoranI autocorrelation value is close to -1 it means that there is negative +correlation. Negative correlation means that close values has dissimilar values. + +You can see spatial correlation values on the figure below + +- on the left there is negative correlation (-1) +- in the middle correlation is positive (1) +- on the right the correlation is close to zero and data is random. + +![moranI.png](../../image/moranI.png) + +Moran statistics can be used as the Scala/Java and Python functions. +As the input function requires weight DataFrame. You can create the +weight DataFrame using Apache Sedona weighting functions. You need +to keep in mind that your input has to have id column that uniquely identifies +the feature and value field. The required minimal schema for the MoranI Apache Sedona +function is: + +``` + |-- id: integer (nullable = true) + |-- value: double (nullable = true) + |-- weights: array (nullable = false) + | |-- element: struct (containsNull = false) + | | |-- neighbor: struct (nullable = false) + | | | |-- id: integer (nullable = true) + | | | |-- value: double (nullable = true) + | | |-- value: double (nullable = true) +``` + +You can manipulate the value column name and id using function parameters. + +To use the [Apache Sedona weight functions](#adddistancebandcolumn) you need to pass the id column and value column to kept parameters. + +=== "Scala" + + ```scala + val weights = Weighting.addDistanceBandColumn( + positiveCorrelationFrame, + 1.0, + savedAttributes = Seq("id", "value") + ) + + val moranResult = Moran.getGlobal(weights, idColumn = "id") + + // result fields + moranResult.getPNorm + moranResult.getI + moranResult.getZNorm + ``` + +=== "Python" + + ```python + from sedona.spark.stats.autocorrelation.moran import Moran + from sedona.spark.stats.weighting import add_binary_distance_band_column + + result = add_binary_distance_band_column( + df, + 1.0, + saved_attributes=["id", "value"] + ) + + moran_i_result = Moran.get_global(result) + + ## result fields + moran_i_result.p_norm + moran_i_result.i + moran_i_result.z_norm + ``` + +In the result you get the Z norm, P norm and Moran I value. + +The full signatures of the functions + +=== "Scala" + + ```scala + def getGlobal( + dataframe: DataFrame, + twoTailed: Boolean = true, + idColumn: String = ID_COLUMN, + valueColumnName: String = VALUE_COLUMN): MoranResult + + // java interface + public interface MoranResult { + public double getI(); + public double getPNorm(); + public double getZNorm(); + } + ``` + +=== "Python" + + ```python + def get_global( + df: DataFrame, + two_tailed: bool = True, + id_column: str = "id", + value_column: str = "value", + ) -> MoranResult + + @dataclass + class MoranResult: + i: float + p_norm: float + z_norm: float + ``` diff --git a/docs/image/moranI.png b/docs/image/moranI.png new file mode 100644 index 00000000000..157c2e36e67 Binary files /dev/null and b/docs/image/moranI.png differ diff --git a/python/sedona/spark/register/java_libs.py b/python/sedona/spark/register/java_libs.py index 675d788855a..8ba82f2c3fa 100644 --- a/python/sedona/spark/register/java_libs.py +++ b/python/sedona/spark/register/java_libs.py @@ -65,6 +65,7 @@ class SedonaJvmLib(Enum): st_predicates = "org.apache.spark.sql.sedona_sql.expressions.st_predicates" st_aggregates = "org.apache.spark.sql.sedona_sql.expressions.st_aggregates" SedonaContext = "org.apache.sedona.spark.SedonaContext" + Moran = "org.apache.sedona.stats.autocorrelation.Moran" @classmethod def from_str(cls, geo_lib: str) -> "SedonaJvmLib": diff --git a/python/sedona/spark/stats/autocorrelation/__init__.py b/python/sedona/spark/stats/autocorrelation/__init__.py new file mode 100644 index 00000000000..13a83393a91 --- /dev/null +++ b/python/sedona/spark/stats/autocorrelation/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/python/sedona/spark/stats/autocorrelation/moran.py b/python/sedona/spark/stats/autocorrelation/moran.py new file mode 100644 index 00000000000..506aed12b9a --- /dev/null +++ b/python/sedona/spark/stats/autocorrelation/moran.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from dataclasses import dataclass + +from pyspark.sql import DataFrame +from pyspark.sql import SparkSession + + +@dataclass +class MoranResult: + i: float + p_norm: float + z_norm: float + + +class Moran: + + @staticmethod + def get_global( + df: DataFrame, + two_tailed: bool = True, + id_column: str = "id", + value_column: str = "value", + ) -> MoranResult: + sedona = SparkSession.getActiveSession() + + _jvm = sedona._jvm + moran_result = ( + sedona._jvm.org.apache.sedona.stats.autocorrelation.Moran.getGlobal( + df._jdf, two_tailed, id_column, value_column + ) + ) + + return MoranResult( + i=moran_result.getI(), + p_norm=moran_result.getPNorm(), + z_norm=moran_result.getZNorm(), + ) diff --git a/python/sedona/spark/stats/hotspot_detection/getis_ord.py b/python/sedona/spark/stats/hotspot_detection/getis_ord.py index 7e6a0399507..b2b4d3ccc8c 100644 --- a/python/sedona/spark/stats/hotspot_detection/getis_ord.py +++ b/python/sedona/spark/stats/hotspot_detection/getis_ord.py @@ -21,7 +21,7 @@ Geographical Analysis, 24(3), 189-206. https://doi.org/10.1111/j.1538-4632.1992.tb00261.x """ -from pyspark.sql import Column, DataFrame, SparkSession +from pyspark.sql import DataFrame, SparkSession # todo change weights and x type to string @@ -59,7 +59,7 @@ def g_local( sedona = SparkSession.getActiveSession() result_df = sedona._jvm.org.apache.sedona.stats.hotspotDetection.GetisOrd.gLocal( - dataframe, x, weights, permutations, star, island_weight + dataframe._jdf, x, weights, permutations, star, island_weight ) return DataFrame(result_df, sedona) diff --git a/python/sedona/spark/stats/weighting.py b/python/sedona/spark/stats/weighting.py index 05cd08db4f9..d12639255e1 100644 --- a/python/sedona/spark/stats/weighting.py +++ b/python/sedona/spark/stats/weighting.py @@ -60,18 +60,21 @@ def add_distance_band_column( """ sedona = SparkSession.getActiveSession() - return sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumn( - dataframe._jdf, - float(threshold), - binary, - float(alpha), - include_zero_distance_neighbors, - include_self, - float(self_weight), - geometry, - use_spheroid, - saved_attributes, - result_name, + return DataFrame( + sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumnPython( + dataframe._jdf, + float(threshold), + binary, + float(alpha), + include_zero_distance_neighbors, + include_self, + float(self_weight), + geometry, + use_spheroid, + saved_attributes, + result_name, + ), + sedona, ) @@ -110,15 +113,21 @@ def add_binary_distance_band_column( """ sedona = SparkSession.getActiveSession() - return sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn( - dataframe._jdf, - float(threshold), - include_zero_distance_neighbors, - include_self, - geometry, - use_spheroid, - saved_attributes, - result_name, + return DataFrame( + sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumnPython( + dataframe._jdf, + float(threshold), + True, + float(-1.0), + include_zero_distance_neighbors, + include_self, + float(1.0), + geometry, + use_spheroid, + saved_attributes, + result_name, + ), + sedona, ) @@ -161,15 +170,19 @@ def add_weighted_distance_band_column( """ sedona = SparkSession.getActiveSession() - return sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn( - dataframe._jdf, - float(threshold), - float(alpha), - include_zero_distance_neighbors, - include_self, - float(self_weight), - geometry, - use_spheroid, - saved_attributes, - result_name, + return DataFrame( + sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumnPython( + dataframe._jdf, + float(threshold), + False, + alpha, + include_zero_distance_neighbors, + include_self, + self_weight, + geometry, + use_spheroid, + saved_attributes, + result_name, + ), + sedona, ) diff --git a/python/tests/stats/test_moran.py b/python/tests/stats/test_moran.py new file mode 100644 index 00000000000..d910d79f1cf --- /dev/null +++ b/python/tests/stats/test_moran.py @@ -0,0 +1,95 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from sedona.spark.stats.autocorrelation.moran import Moran +from sedona.spark.stats.weighting import add_binary_distance_band_column +from tests.test_base import TestBase + + +class TestMoran(TestBase): + + data = [ + (1, 1.0, 1.0, 8.5), + (2, 1.5, 1.2, 8.2), + (3, 1.3, 1.8, 8.7), + (4, 1.7, 1.6, 7.9), + (5, 4.0, 1.5, 6.2), + (6, 4.2, 1.7, 6.5), + (7, 4.5, 1.3, 5.9), + (8, 4.7, 1.8, 6.0), + (9, 1.8, 4.3, 3.1), + (10, 1.5, 4.5, 3.4), + (11, 1.2, 4.7, 3.0), + (12, 1.6, 4.2, 3.3), + (13, 1.9, 4.8, 2.8), + (14, 4.3, 4.2, 1.2), + (15, 4.5, 4.5, 1.5), + (16, 4.7, 4.8, 1.0), + (17, 4.1, 4.6, 1.3), + (18, 4.8, 4.3, 1.1), + (19, 4.2, 4.9, 1.4), + (20, 4.6, 4.1, 1.6), + ] + + def test_moran_integration(self): + df = ( + self.spark.createDataFrame(self.data) + .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value") + .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value") + ) + + result = add_binary_distance_band_column( + df, 1.0, saved_attributes=["id", "value"] + ) + + moran_i_result = Moran.get_global(result) + + assert 0 < moran_i_result.p_norm < 0.00001 + assert 0.9614 < moran_i_result.i < 0.9615 + assert 7.7523 < moran_i_result.z_norm < 7.7524 + + two_tailed_result = Moran.get_global(result, False) + assert 0 < two_tailed_result.p_norm < 0.00001 + assert 0.9614 < two_tailed_result.i < 0.9615 + assert 7.7523 < two_tailed_result.z_norm < 7.7524 + + def test_moran_with_different_column_names(self): + df = ( + self.spark.createDataFrame(self.data) + .selectExpr("_1 as index", "_2 AS x", "_3 AS y", "_4 AS feature_value") + .selectExpr("index", "ST_MakePoint(x, y) AS geometry", "feature_value") + ) + + result = add_binary_distance_band_column( + df, threshold=1.0, saved_attributes=["index", "feature_value"] + ) + + moran_i_result = Moran.get_global( + result, id_column="index", value_column="feature_value" + ) + + assert 0 < moran_i_result.p_norm < 0.00001 + assert 0.9614 < moran_i_result.i < 0.9615 + assert 7.7523 < moran_i_result.z_norm < 7.7524 + + two_tailed_result = Moran.get_global( + result, id_column="index", value_column="feature_value", two_tailed=False + ) + + assert 0 < two_tailed_result.p_norm < 0.00001 + assert 0.9614 < two_tailed_result.i < 0.9615 + assert 7.7523 < two_tailed_result.z_norm < 7.7524 diff --git a/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java b/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java new file mode 100644 index 00000000000..1d158c7bf54 --- /dev/null +++ b/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.stats.autocorrelation; + +public class MoranResult { + private final double i; + private final double pNorm; + private final double zNorm; + + public MoranResult(double i, double pNorm, double zNorm) { + this.i = i; + this.pNorm = pNorm; + this.zNorm = zNorm; + } + + public double getI() { + return i; + } + + public double getPNorm() { + return pNorm; + } + + public double getZNorm() { + return zNorm; + } +} diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala index 77136742618..d404f2c2dbc 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala @@ -22,6 +22,7 @@ import org.apache.sedona.util.DfUtils.getGeometryColumnName import org.apache.spark.sql.functions._ import org.apache.spark.sql.sedona_sql.expressions.st_functions.{ST_Distance, ST_DistanceSpheroid} import org.apache.spark.sql.{Column, DataFrame} +import scala.collection.JavaConverters._ object Weighting { @@ -255,4 +256,34 @@ object Weighting { savedAttributes = savedAttributes, resultName = resultName) + def addDistanceBandColumnPython( + dataframe: DataFrame, + threshold: Double, + binary: Boolean = true, + alpha: Double = -1.0, + includeZeroDistanceNeighbors: Boolean = false, + includeSelf: Boolean = false, + selfWeight: Double = 1.0, + geometry: String = null, + useSpheroid: Boolean = false, + savedAttributes: java.util.ArrayList[String] = null, + resultName: String = "weights"): DataFrame = { + + val savedAttributesScala = + if (savedAttributes != null) savedAttributes.asScala.toSeq + else null + + addDistanceBandColumn( + dataframe, + threshold, + binary, + alpha, + includeZeroDistanceNeighbors, + includeSelf, + selfWeight, + geometry, + useSpheroid, + savedAttributesScala, + resultName) + } } diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/autocorrelation/Moran.scala b/spark/common/src/main/scala/org/apache/sedona/stats/autocorrelation/Moran.scala new file mode 100644 index 00000000000..8e38cfdd0b0 --- /dev/null +++ b/spark/common/src/main/scala/org/apache/sedona/stats/autocorrelation/Moran.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.stats.autocorrelation + +import org.apache.commons.math3.distribution.NormalDistribution +import org.apache.sedona.stats.autocorrelation.MoranResult +import org.apache.spark.sql.{DataFrame, functions} +import org.apache.spark.sql.functions.{col, explode, pow} + +object Moran { + private val ID_COLUMN = "id" + private val VALUE_COLUMN = "value" + + private def normSf(x: Double, mean: Double = 0.0, stdDev: Double = 1.0): Double = { + val normalDist = new NormalDistribution(mean, stdDev) + 1.0 - normalDist.cumulativeProbability(x) + } + + private def normCdf(x: Double, mean: Double = 0.0, stdDev: Double = 1.0): Double = { + val normalDist = new NormalDistribution(mean, stdDev) + normalDist.cumulativeProbability(x) + } + + def getGlobal( + dataframe: DataFrame, + twoTailed: Boolean = true, + idColumn: String = ID_COLUMN, + valueColumnName: String = VALUE_COLUMN): MoranResult = { + val spark = dataframe.sparkSession + import spark.implicits._ + + val data = dataframe + .selectExpr(s"avg($valueColumnName)", "count(*)") + .as[(Double, Long)] + .head() + + val yMean = data._1 + + val n = data._2 + + val explodedWeights = dataframe + .select(col(idColumn), explode(col("weights")).alias("col")) + .select( + $"$idColumn".alias("id"), + $"col.neighbor.$idColumn".alias("n_id"), + $"col.value".alias("weight_value")) + + val s1Data = explodedWeights + .alias("left") + .join( + explodedWeights.alias("right"), + $"left.n_id" === $"right.id" && $"right.n_id" === $"left.id") + .select( + $"left.id", + $"right.weight_value".alias("b_weight_value"), + pow(($"right.weight_value" + $"left.weight_value"), 2).alias("s1_comp"), + $"left.weight_value".alias("a_weight")) + + val sStats = s1Data + .selectExpr( + "CAST(sum(s1_comp)/2 AS DOUBLE) AS s1_comp_sum", + "CAST(sum(a_weight) AS DOUBLE) AS a_weight_sum", + "CAST(sum(b_weight_value) AS DOUBLE) AS b_weight_value_sum") + .as[(Double, Double, Double)] + .head() + + val s1 = sStats._1 + + val inumData = dataframe + .selectExpr( + s"$idColumn AS id", + s"$valueColumnName AS value", + f"$valueColumnName - ${yMean} AS z", + f"transform(weights, w -> struct(w.neighbor.$idColumn AS id, w.value AS w, w.neighbor.$valueColumnName, w.neighbor.$valueColumnName - ${yMean} AS z)) AS weight") + .selectExpr( + "z", + "AGGREGATE(transform(weight, x-> x.z*x.w), CAST(0.0 AS DOUBLE), (acc, x) -> acc + x) AS ZL", + "AGGREGATE(transform(weight, x-> x.w), CAST(0.0 AS DOUBLE), (acc, x) -> acc + x) AS w_sum", + "AGGREGATE(transform(weight, x-> x.w), CAST(0.0 AS DOUBLE), (acc, x) -> acc + x) AS w_sq_sum", + "z * z AS z2ss_comp") + .selectExpr("*", "(z * zl) AS inum_comp") + .selectExpr("sum(inum_comp)", "sum(w_sum)", "sum(z2ss_comp)") + + val s2Data = s1Data + .groupBy("id") + .agg( + functions.sum("b_weight_value").alias("s_b_weight_value"), + functions.sum("a_weight").alias("s_a_weight")) + .selectExpr("pow(s_b_weight_value + s_a_weight, 2) AS summed") + + val s2 = s2Data.selectExpr("sum(summed)").as[Double].head() + val inumResult = inumData.as[(Double, Double, Double)].head() + val inum = inumResult._1 + + val s0 = inumResult._2 + + val z2ss = inumResult._3 + + val i = n / s0 * inum / z2ss + val ei = -1.0 / (n - 1) + val n2 = n * n + val s02 = s0 * s0 + val vNum = n2 * s1 - n * s2 + 3 * s02 + val vDen = (n - 1) * (n + 1) * s02 + val viNorm = (vNum / vDen) - math.pow(1.0 / (n - 1), 2) + val seINorm = math.pow(viNorm, 0.5) + val zNorm = (i - ei) / seINorm + + val pNorm = if (zNorm > 0) normSf(zNorm) else normCdf(zNorm) + val pNormFinal = if (twoTailed) pNorm * 2.0 else pNorm + + new MoranResult(i, pNormFinal, zNorm) + } +} diff --git a/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/AutoCorrelationFixtures.scala b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/AutoCorrelationFixtures.scala new file mode 100644 index 00000000000..5209bc7d3f0 --- /dev/null +++ b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/AutoCorrelationFixtures.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.stats.autocorrelation + +import org.apache.sedona.spark.SedonaContext +import org.apache.sedona.sql.TestBaseScala + +import java.nio.file.Files + +trait AutoCorrelationFixtures { + this: TestBaseScala => + SedonaContext.create(sparkSession) + + val positiveAutoCorrelation = List( + (1, 1.0, 1.0, 8.5), + (2, 1.5, 1.2, 8.2), + (3, 1.3, 1.8, 8.7), + (4, 1.7, 1.6, 7.9), + (5, 4.0, 1.5, 6.2), + (6, 4.2, 1.7, 6.5), + (7, 4.5, 1.3, 5.9), + (8, 4.7, 1.8, 6.0), + (9, 1.8, 4.3, 3.1), + (10, 1.5, 4.5, 3.4), + (11, 1.2, 4.7, 3.0), + (12, 1.6, 4.2, 3.3), + (13, 1.9, 4.8, 2.8), + (14, 4.3, 4.2, 1.2), + (15, 4.5, 4.5, 1.5), + (16, 4.7, 4.8, 1.0), + (17, 4.1, 4.6, 1.3), + (18, 4.8, 4.3, 1.1), + (19, 4.2, 4.9, 1.4), + (20, 4.6, 4.1, 1.6)) + + val positiveCorrelationFrame = sparkSession + .createDataFrame(positiveAutoCorrelation) + .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value") + .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value") + + val negativeCorrelationPoints = List( + (1, 1.0, 1.0, 8.5), + (2, 2.0, 1.0, 2.1), + (3, 3.0, 1.0, 8.2), + (4, 4.0, 1.0, 2.3), + (5, 5.0, 1.0, 8.7), + (6, 1.0, 2.0, 2.5), + (7, 2.0, 2.0, 8.1), + (8, 3.0, 2.0, 2.7), + (9, 4.0, 2.0, 8.3), + (10, 5.0, 2.0, 2.0), + (11, 1.0, 3.0, 8.6), + (12, 2.0, 3.0, 2.2), + (13, 3.0, 3.0, 8.4), + (14, 4.0, 3.0, 2.4), + (15, 5.0, 3.0, 8.0), + (16, 1.0, 4.0, 2.6), + (17, 2.0, 4.0, 8.8), + (18, 3.0, 4.0, 2.8), + (19, 4.0, 4.0, 8.9), + (20, 5.0, 4.0, 2.9)) + + val zeroCorrelationPoints = List( + (1, 3.75, 7.89, 2.58), + (2, 9.31, 4.25, 7.43), + (3, 5.12, 0.48, 5.96), + (4, 6.25, 1.74, 3.12), + (5, 1.47, 6.33, 8.26), + (6, 8.18, 9.57, 1.97), + (7, 2.64, 3.05, -6.42), + (8, 4.33, 5.88, 4.74), + (9, 7.91, 2.41, -10.13), + (10, 0.82, 8.76, 3.89), + (11, 9.70, 1.19, 100.0), + (12, 2.18, 7.54, 7.35), + (13, 5.47, 3.94, 2.16), + (14, 8.59, 6.78, -12.63), + (15, 3.07, 2.88, 4.27), + (16, 6.71, 9.12, 6.84), + (17, 1.34, 4.51, -25.0), + (18, 7.26, 5.29, -45.0), + (19, 4.89, 0.67, 1.59), + (20, 0.53, 8.12, 5.21)) + + val zeroCorrelationFrame = sparkSession + .createDataFrame(zeroCorrelationPoints) + .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value") + .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value") + + val negativeCorrelationFrame = sparkSession + .createDataFrame(negativeCorrelationPoints) + .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value") + .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value") +} diff --git a/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/MoranTest.scala b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/MoranTest.scala new file mode 100644 index 00000000000..f26f79197b2 --- /dev/null +++ b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/MoranTest.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.stats.autocorrelation + +import org.apache.sedona.sql.TestBaseScala +import org.apache.sedona.stats.Weighting +import org.apache.sedona.stats.autocorrelation.Moran +import org.apache.spark.sql.functions.expr + +class MoranTest extends TestBaseScala with AutoCorrelationFixtures { + describe("Moran's I") { + it("correlation exists") { + val weights = Weighting + .addDistanceBandColumn( + positiveCorrelationFrame, + 1.0, + savedAttributes = Seq("id", "value")) + .withColumn( + "weights", + expr("transform(weights, w -> struct(w.neighbor, w.value/size(weights) AS value))")) + + weights.cache().count() + + val moranResult = Moran.getGlobal(weights, idColumn = "id") + + assert(moranResult.getPNorm < 0.0001) + assert(moranResult.getI > 0.99) + } + + it("different id and value column names") { + val weights = Weighting + .addDistanceBandColumn( + positiveCorrelationFrame + .selectExpr("id AS index", "value as feature_value", "geometry"), + 1.0, + savedAttributes = Seq("index", "feature_value")) + .withColumn( + "weights", + expr("transform(weights, w -> struct(w.neighbor, w.value/size(weights) AS value))")) + + weights.cache().count() + + val moranResult = + Moran.getGlobal(weights, idColumn = "index", valueColumnName = "feature_value") + + assert(moranResult.getPNorm < 0.0001) + assert(moranResult.getI > 0.99) + } + + it("correlation is negative") { + val weights = Weighting + .addDistanceBandColumn( + negativeCorrelationFrame, + 1.0, + savedAttributes = Seq("id", "value")) + .withColumn( + "weights", + expr("transform(weights, w -> struct(w.neighbor, w.value/size(weights) AS value))")) + + weights.cache().count() + + val moranResult = Moran.getGlobal(weights) + + assert(moranResult.getPNorm < 0.0001) + assert(moranResult.getI < -0.99) + assert(moranResult.getI > -1) + } + + it("zero correlation exists") { + val weights = Weighting + .addDistanceBandColumn(zeroCorrelationFrame, 2.0, savedAttributes = Seq("id", "value")) + .withColumn( + "weights", + expr("transform(weights, w -> struct(w.neighbor, w.value/size(weights) AS value))")) + + weights.cache().count() + + val moranResult = Moran.getGlobal(weights) + + assert(moranResult.getPNorm < 0.44 && moranResult.getPNorm > 0.43) + assert(moranResult.getI < 0.16 && moranResult.getI > 0.15) + } + } +}