From e9e20639ac0cbdf16e3c9996eedb5f7abd830ff7 Mon Sep 17 00:00:00 2001 From: pawelkocinski Date: Wed, 4 Jun 2025 23:44:41 +0200 Subject: [PATCH 01/10] SEDONA-738 Add moran i autocorrelation. --- python/sedona/spark/register/java_libs.py | 1 + .../spark/stats/autocorrelation/__init__.py | 16 +++ .../spark/stats/autocorrelation/moran.py | 49 +++++++ python/sedona/spark/stats/weighting.py | 73 +++++----- python/tests/stats/test_moran.py | 63 +++++++++ .../stats/autocorrelation/MoranResult.java | 43 ++++++ .../sedona/stats/autocorelation/Moran.scala | 128 ++++++++++++++++++ .../AutoCorrelationFixtures.scala | 110 +++++++++++++++ .../stats/autocorellation/MoranTest.scala | 80 +++++++++++ 9 files changed, 531 insertions(+), 32 deletions(-) create mode 100644 python/sedona/spark/stats/autocorrelation/__init__.py create mode 100644 python/sedona/spark/stats/autocorrelation/moran.py create mode 100644 python/tests/stats/test_moran.py create mode 100644 spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java create mode 100644 spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala create mode 100644 spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala create mode 100644 spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/MoranTest.scala diff --git a/python/sedona/spark/register/java_libs.py b/python/sedona/spark/register/java_libs.py index 675d788855a..d9f76831b4b 100644 --- a/python/sedona/spark/register/java_libs.py +++ b/python/sedona/spark/register/java_libs.py @@ -65,6 +65,7 @@ class SedonaJvmLib(Enum): st_predicates = "org.apache.spark.sql.sedona_sql.expressions.st_predicates" st_aggregates = "org.apache.spark.sql.sedona_sql.expressions.st_aggregates" SedonaContext = "org.apache.sedona.spark.SedonaContext" + Moran = "org.apache.sedona.stats.autocorelation.Moran" @classmethod def from_str(cls, geo_lib: str) -> "SedonaJvmLib": diff --git a/python/sedona/spark/stats/autocorrelation/__init__.py b/python/sedona/spark/stats/autocorrelation/__init__.py new file mode 100644 index 00000000000..13a83393a91 --- /dev/null +++ b/python/sedona/spark/stats/autocorrelation/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/python/sedona/spark/stats/autocorrelation/moran.py b/python/sedona/spark/stats/autocorrelation/moran.py new file mode 100644 index 00000000000..9b7717a0f7f --- /dev/null +++ b/python/sedona/spark/stats/autocorrelation/moran.py @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from dataclasses import dataclass + +from pyspark.sql import DataFrame +from pyspark.sql import SparkSession + + +@dataclass +class MoranResult: + i: float + p_norm: float + z_norm: float + + +class Moran: + + @staticmethod + def get_global( + df: DataFrame, two_tailed: bool = True, id_column: str = "id" + ) -> MoranResult: + sedona = SparkSession.getActiveSession() + + _jvm = sedona._jvm + moran_result = ( + sedona._jvm.org.apache.sedona.stats.autocorelation.Moran.getGlobal( + df._jdf, two_tailed, id_column + ) + ) + + return MoranResult( + i=moran_result.getI(), + p_norm=moran_result.getPNorm(), + z_norm=moran_result.getZNorm(), + ) diff --git a/python/sedona/spark/stats/weighting.py b/python/sedona/spark/stats/weighting.py index 05cd08db4f9..68394d75f0f 100644 --- a/python/sedona/spark/stats/weighting.py +++ b/python/sedona/spark/stats/weighting.py @@ -60,18 +60,21 @@ def add_distance_band_column( """ sedona = SparkSession.getActiveSession() - return sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumn( - dataframe._jdf, - float(threshold), - binary, - float(alpha), - include_zero_distance_neighbors, - include_self, - float(self_weight), - geometry, - use_spheroid, - saved_attributes, - result_name, + return DataFrame( + sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumn( + dataframe._jdf, + float(threshold), + binary, + float(alpha), + include_zero_distance_neighbors, + include_self, + float(self_weight), + geometry, + use_spheroid, + saved_attributes, + result_name, + ), + sedona._jsparkSession, ) @@ -110,15 +113,18 @@ def add_binary_distance_band_column( """ sedona = SparkSession.getActiveSession() - return sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn( - dataframe._jdf, - float(threshold), - include_zero_distance_neighbors, - include_self, - geometry, - use_spheroid, - saved_attributes, - result_name, + return DataFrame( + sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn( + dataframe._jdf, + float(threshold), + include_zero_distance_neighbors, + include_self, + geometry, + use_spheroid, + saved_attributes, + result_name, + ), + sedona._jsparkSession, ) @@ -161,15 +167,18 @@ def add_weighted_distance_band_column( """ sedona = SparkSession.getActiveSession() - return sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn( - dataframe._jdf, - float(threshold), - float(alpha), - include_zero_distance_neighbors, - include_self, - float(self_weight), - geometry, - use_spheroid, - saved_attributes, - result_name, + return DataFrame( + sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn( + dataframe._jdf, + float(threshold), + float(alpha), + include_zero_distance_neighbors, + include_self, + float(self_weight), + geometry, + use_spheroid, + saved_attributes, + result_name, + ), + sedona._jsparkSession, ) diff --git a/python/tests/stats/test_moran.py b/python/tests/stats/test_moran.py new file mode 100644 index 00000000000..84b13ee379d --- /dev/null +++ b/python/tests/stats/test_moran.py @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from sedona.spark.stats.autocorrelation.moran import Moran, MoranResult +from sedona.spark.stats.weighting import add_binary_distance_band_column +from tests.test_base import TestBase + + +class TestMoran(TestBase): + + def test_moran_integration(self): + data = [ + (1, 1.0, 1.0, 8.5), + (2, 1.5, 1.2, 8.2), + (3, 1.3, 1.8, 8.7), + (4, 1.7, 1.6, 7.9), + (5, 4.0, 1.5, 6.2), + (6, 4.2, 1.7, 6.5), + (7, 4.5, 1.3, 5.9), + (8, 4.7, 1.8, 6.0), + (9, 1.8, 4.3, 3.1), + (10, 1.5, 4.5, 3.4), + (11, 1.2, 4.7, 3.0), + (12, 1.6, 4.2, 3.3), + (13, 1.9, 4.8, 2.8), + (14, 4.3, 4.2, 1.2), + (15, 4.5, 4.5, 1.5), + (16, 4.7, 4.8, 1.0), + (17, 4.1, 4.6, 1.3), + (18, 4.8, 4.3, 1.1), + (19, 4.2, 4.9, 1.4), + (20, 4.6, 4.1, 1.6), + ] + + df = ( + self.spark.createDataFrame(data) + .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value") + .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value") + ) + + result = add_binary_distance_band_column(df, 1.0) + + assert Moran.get_global(result) == MoranResult( + i=0.9614304631460562, p_norm=9.103828801926284e-15, z_norm=7.752321966127421 + ) + + assert Moran.get_global(result, False) == MoranResult( + i=0.9614304631460562, p_norm=4.551914400963142e-15, z_norm=7.752321966127421 + ) diff --git a/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java b/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java new file mode 100644 index 00000000000..1d158c7bf54 --- /dev/null +++ b/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.stats.autocorrelation; + +public class MoranResult { + private final double i; + private final double pNorm; + private final double zNorm; + + public MoranResult(double i, double pNorm, double zNorm) { + this.i = i; + this.pNorm = pNorm; + this.zNorm = zNorm; + } + + public double getI() { + return i; + } + + public double getPNorm() { + return pNorm; + } + + public double getZNorm() { + return zNorm; + } +} diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala b/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala new file mode 100644 index 00000000000..7e06a069ddc --- /dev/null +++ b/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.stats.autocorelation + +import org.apache.commons.math3.distribution.NormalDistribution +import org.apache.sedona.stats.autocorrelation.MoranResult +import org.apache.spark.sql.{DataFrame, functions} +import org.apache.spark.sql.functions.{col, explode, pow} + +object Moran { + private val ID_COLUMN = "id" + + private def normSf(x: Double, mean: Double = 0.0, stdDev: Double = 1.0): Double = { + val normalDist = new NormalDistribution(mean, stdDev) + 1.0 - normalDist.cumulativeProbability(x) + } + + private def normCdf(x: Double, mean: Double = 0.0, stdDev: Double = 1.0): Double = { + val normalDist = new NormalDistribution(mean, stdDev) + normalDist.cumulativeProbability(x) + } + + def getGlobal( + dataframe: DataFrame, + twoTailed: Boolean = true, + idColumn: String = ID_COLUMN): MoranResult = { + val spark = dataframe.sparkSession + import spark.implicits._ + + val data = dataframe + .selectExpr("avg(value)", "count(*)") + .as[(Double, Long)] + .head() + + val yMean = data._1 + + val n = data._2 + + val explodedWeights = dataframe + .select(col(idColumn), explode(col("weights")).alias("col")) + .select( + $"$idColumn".alias("id"), + $"col.neighbor.id".alias("n_id"), + $"col.value".alias("weight_value")) + + val s1Data = explodedWeights + .alias("left") + .join( + explodedWeights.alias("right"), + $"left.n_id" === $"right.id" && $"right.n_id" === $"left.id") + .select( + $"left.id", + $"right.weight_value".alias("b_weight_value"), + pow(($"right.weight_value" + $"left.weight_value"), 2).alias("s1_comp"), + $"left.weight_value".alias("a_weight")) + + val sStats = s1Data + .selectExpr( + "CAST(sum(s1_comp)/2 AS DOUBLE) AS s1_comp_sum", + "CAST(sum(a_weight) AS DOUBLE) AS a_weight_sum", + "CAST(sum(b_weight_value) AS DOUBLE) AS b_weight_value_sum") + .as[(Double, Double, Double)] + .head() + + val s1 = sStats._1 + + val inumData = dataframe + .selectExpr( + s"$idColumn AS id", + "value", + f"value - ${yMean} AS z", + f"transform(weights, w -> struct(w.neighbor.id AS id, w.value AS w, w.neighbor.value, w.neighbor.value - ${yMean} AS z)) AS weight") + .selectExpr( + "z", + "AGGREGATE(transform(weight, x-> x.z*x.w), CAST(0.0 AS DOUBLE), (acc, x) -> acc + x) AS ZL", + "AGGREGATE(transform(weight, x-> x.w), CAST(0.0 AS DOUBLE), (acc, x) -> acc + x) AS w_sum", + "AGGREGATE(transform(weight, x-> x.w), CAST(0.0 AS DOUBLE), (acc, x) -> acc + x) AS w_sq_sum", + "z * z AS z2ss_comp") + .selectExpr("*", "(z * zl) AS inum_comp") + .selectExpr("sum(inum_comp)", "sum(w_sum)", "sum(z2ss_comp)") + + val s2Data = s1Data + .groupBy("id") + .agg( + functions.sum("b_weight_value").alias("s_b_weight_value"), + functions.sum("a_weight").alias("s_a_weight")) + .selectExpr("pow(s_b_weight_value + s_a_weight, 2) AS summed") + + val s2 = s2Data.selectExpr("sum(summed)").as[Double].head() + val inumResult = inumData.as[(Double, Double, Double)].head() + val inum = inumResult._1 + + val s0 = inumResult._2 + + val z2ss = inumResult._3 + + val i = n / s0 * inum / z2ss + val ei = -1.0 / (n - 1) + val n2 = n * n + val s02 = s0 * s0 + val vNum = n2 * s1 - n * s2 + 3 * s02 + val vDen = (n - 1) * (n + 1) * s02 + val viNorm = (vNum / vDen) - math.pow(1.0 / (n - 1), 2) + val seINorm = math.pow(viNorm, 0.5) + val zNorm = (i - ei) / seINorm + + val pNorm = if (zNorm > 0) normSf(zNorm) else normCdf(zNorm) + val pNormFinal = if (twoTailed) pNorm * 2.0 else pNorm + + new MoranResult(i, pNormFinal, zNorm) + } +} diff --git a/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala b/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala new file mode 100644 index 00000000000..c87ae430f1e --- /dev/null +++ b/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.stats.autocorellation + +import org.apache.sedona.spark.SedonaContext +import org.apache.sedona.sql.TestBaseScala + +import java.nio.file.Files + +trait AutoCorrelationFixtures { + this: TestBaseScala => + SedonaContext.create(sparkSession) + + val positiveAutoCorrelation = List( + (1, 1.0, 1.0, 8.5), + (2, 1.5, 1.2, 8.2), + (3, 1.3, 1.8, 8.7), + (4, 1.7, 1.6, 7.9), + (5, 4.0, 1.5, 6.2), + (6, 4.2, 1.7, 6.5), + (7, 4.5, 1.3, 5.9), + (8, 4.7, 1.8, 6.0), + (9, 1.8, 4.3, 3.1), + (10, 1.5, 4.5, 3.4), + (11, 1.2, 4.7, 3.0), + (12, 1.6, 4.2, 3.3), + (13, 1.9, 4.8, 2.8), + (14, 4.3, 4.2, 1.2), + (15, 4.5, 4.5, 1.5), + (16, 4.7, 4.8, 1.0), + (17, 4.1, 4.6, 1.3), + (18, 4.8, 4.3, 1.1), + (19, 4.2, 4.9, 1.4), + (20, 4.6, 4.1, 1.6)) + + val positiveCorrelationFrame = sparkSession + .createDataFrame(positiveAutoCorrelation) + .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value") + .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value") + + val negativeCorrelationPoints = List( + (1, 1.0, 1.0, 8.5), + (2, 2.0, 1.0, 2.1), + (3, 3.0, 1.0, 8.2), + (4, 4.0, 1.0, 2.3), + (5, 5.0, 1.0, 8.7), + (6, 1.0, 2.0, 2.5), + (7, 2.0, 2.0, 8.1), + (8, 3.0, 2.0, 2.7), + (9, 4.0, 2.0, 8.3), + (10, 5.0, 2.0, 2.0), + (11, 1.0, 3.0, 8.6), + (12, 2.0, 3.0, 2.2), + (13, 3.0, 3.0, 8.4), + (14, 4.0, 3.0, 2.4), + (15, 5.0, 3.0, 8.0), + (16, 1.0, 4.0, 2.6), + (17, 2.0, 4.0, 8.8), + (18, 3.0, 4.0, 2.8), + (19, 4.0, 4.0, 8.9), + (20, 5.0, 4.0, 2.9)) + + val zeroCorrelationPoints = List( + (1, 3.75, 7.89, 2.58), + (2, 9.31, 4.25, 7.43), + (3, 5.12, 0.48, 5.96), + (4, 6.25, 1.74, 3.12), + (5, 1.47, 6.33, 8.26), + (6, 8.18, 9.57, 1.97), + (7, 2.64, 3.05, -6.42), + (8, 4.33, 5.88, 4.74), + (9, 7.91, 2.41, -10.13), + (10, 0.82, 8.76, 3.89), + (11, 9.70, 1.19, 100.0), + (12, 2.18, 7.54, 7.35), + (13, 5.47, 3.94, 2.16), + (14, 8.59, 6.78, -12.63), + (15, 3.07, 2.88, 4.27), + (16, 6.71, 9.12, 6.84), + (17, 1.34, 4.51, -25.0), + (18, 7.26, 5.29, -45.0), + (19, 4.89, 0.67, 1.59), + (20, 0.53, 8.12, 5.21)) + + val zeroCorrelationFrame = sparkSession + .createDataFrame(zeroCorrelationPoints) + .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value") + .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value") + + val negativeCorrelationFrame = sparkSession + .createDataFrame(negativeCorrelationPoints) + .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value") + .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value") +} diff --git a/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/MoranTest.scala b/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/MoranTest.scala new file mode 100644 index 00000000000..c350959f95c --- /dev/null +++ b/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/MoranTest.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.stats.autocorellation + +import org.apache.sedona.sql.TestBaseScala +import org.apache.sedona.stats.Weighting +import org.apache.sedona.stats.autocorelation.Moran +import org.apache.spark.sql.functions.expr + +class MoranTest extends TestBaseScala with AutoCorrelationFixtures { + describe("Moran's I") { + it("correlation exists") { + val weights = Weighting + .addDistanceBandColumn( + positiveCorrelationFrame, + 1.0, + savedAttributes = Seq("id", "value")) + .withColumn( + "weights", + expr("transform(weights, w -> struct(w.neighbor, w.value/size(weights) AS value))")) + + weights.cache().count() + + val moranResult = Moran.getGlobal(weights, idColumn = "id") + + assert(moranResult.getPNorm < 0.0001) + assert(moranResult.getI > 0.99) + } + + it("correlation is negative") { + val weights = Weighting + .addDistanceBandColumn( + negativeCorrelationFrame, + 1.0, + savedAttributes = Seq("id", "value")) + .withColumn( + "weights", + expr("transform(weights, w -> struct(w.neighbor, w.value/size(weights) AS value))")) + + weights.cache().count() + + val moranResult = Moran.getGlobal(weights) + + assert(moranResult.getPNorm < 0.0001) + assert(moranResult.getI < -0.99) + assert(moranResult.getI > -1) + } + + it("zero correlation exists") { + val weights = Weighting + .addDistanceBandColumn(zeroCorrelationFrame, 2.0, savedAttributes = Seq("id", "value")) + .withColumn( + "weights", + expr("transform(weights, w -> struct(w.neighbor, w.value/size(weights) AS value))")) + + weights.cache().count() + + val moranResult = Moran.getGlobal(weights) + + assert(moranResult.getPNorm < 0.44 && moranResult.getPNorm > 0.43) + assert(moranResult.getI < 0.16 && moranResult.getI > 0.15) + } + } +} From 58b9a32b2053bc380b2058ece93c0c08f2d765e6 Mon Sep 17 00:00:00 2001 From: pawelkocinski Date: Thu, 5 Jun 2025 20:15:23 +0200 Subject: [PATCH 02/10] SEDONA-738 Fix unit tests. --- python/sedona/spark/stats/hotspot_detection/getis_ord.py | 4 ++-- python/sedona/spark/stats/weighting.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/python/sedona/spark/stats/hotspot_detection/getis_ord.py b/python/sedona/spark/stats/hotspot_detection/getis_ord.py index 7e6a0399507..b2b4d3ccc8c 100644 --- a/python/sedona/spark/stats/hotspot_detection/getis_ord.py +++ b/python/sedona/spark/stats/hotspot_detection/getis_ord.py @@ -21,7 +21,7 @@ Geographical Analysis, 24(3), 189-206. https://doi.org/10.1111/j.1538-4632.1992.tb00261.x """ -from pyspark.sql import Column, DataFrame, SparkSession +from pyspark.sql import DataFrame, SparkSession # todo change weights and x type to string @@ -59,7 +59,7 @@ def g_local( sedona = SparkSession.getActiveSession() result_df = sedona._jvm.org.apache.sedona.stats.hotspotDetection.GetisOrd.gLocal( - dataframe, x, weights, permutations, star, island_weight + dataframe._jdf, x, weights, permutations, star, island_weight ) return DataFrame(result_df, sedona) diff --git a/python/sedona/spark/stats/weighting.py b/python/sedona/spark/stats/weighting.py index 68394d75f0f..8475d0b4bea 100644 --- a/python/sedona/spark/stats/weighting.py +++ b/python/sedona/spark/stats/weighting.py @@ -74,7 +74,7 @@ def add_distance_band_column( saved_attributes, result_name, ), - sedona._jsparkSession, + sedona, ) @@ -124,7 +124,7 @@ def add_binary_distance_band_column( saved_attributes, result_name, ), - sedona._jsparkSession, + sedona, ) @@ -180,5 +180,5 @@ def add_weighted_distance_band_column( saved_attributes, result_name, ), - sedona._jsparkSession, + sedona, ) From 07c7a64ae4250b684a58e3e72dd661f248c1910c Mon Sep 17 00:00:00 2001 From: pawelkocinski Date: Thu, 5 Jun 2025 23:13:44 +0200 Subject: [PATCH 03/10] SEDONA-738 Fix unit tests. --- docs/api/stats/sql.md | 116 ++++++++++++++++++ docs/image/moranI.png | Bin 0 -> 5422 bytes .../spark/stats/autocorrelation/moran.py | 7 +- python/sedona/spark/stats/weighting.py | 14 ++- python/tests/stats/test_moran.py | 92 +++++++++----- .../org/apache/sedona/stats/Weighting.scala | 35 ++++++ .../sedona/stats/autocorelation/Moran.scala | 16 ++- .../stats/autocorellation/MoranTest.scala | 20 +++ 8 files changed, 257 insertions(+), 43 deletions(-) create mode 100644 docs/image/moranI.png diff --git a/docs/api/stats/sql.md b/docs/api/stats/sql.md index f39722d87f9..1fcf8997818 100644 --- a/docs/api/stats/sql.md +++ b/docs/api/stats/sql.md @@ -135,3 +135,119 @@ names in parentheses are python variable names - useSpheroid (use_spheroid) - whether to use a cartesian or spheroidal distance calculation. Default is false In both cases the output is the input DataFrame with the weights column added to each row. + +# MoranI + +Moran I is the spatial autocorrelation algorithm, which is using spatial +location and non-spatial attribute. When the value is close to the 1 it +means that there is spatial correlation, when it is close to 0 then the +correlation does not exist and data is randomly distributed. When the +MoranI autocorrelation value is close to -1 it means that there is negative +correlation. Negative correlation means that close values has dissimilar values. + +You can see spatial correlation values on the figure below + +- on the left there is negative correlation (-1) +- in the middle correlation is positive (1) +- on the right the correlation is close to zero and data is random. + +![moranI.png](../../image/moranI.png) + +Moran statistics can be used as the Scala/Java and Python functions. +As the input function requires weight DataFrame. You can create the +weight DataFrame using Apache Sedona weighting functions. You need +to keep in mind that your input has to have id column that uniquely identifies +the feature and value field. The required minimal schema for the MoranI Apache Sedona +function is: + +``` + |-- id: integer (nullable = true) + |-- value: double (nullable = true) + |-- weights: array (nullable = false) + | |-- element: struct (containsNull = false) + | | |-- neighbor: struct (nullable = false) + | | | |-- id: integer (nullable = true) + | | | |-- value: double (nullable = true) + | | |-- value: double (nullable = true) +``` + +You can manipulate the value column name and id using function parameters. + +To use the Apache Sedona weight functions you need to pass the +id column and value column to kept parameters. + +=== "Scala" + + ```scala + val weights = Weighting.addDistanceBandColumn( + positiveCorrelationFrame, + 1.0, + savedAttributes = Seq("id", "value") + ) + + val moranResult = Moran.getGlobal(weights, idColumn = "id") + + // result fields + moranResult.getPNorm + moranResult.getI + moranResult.getZNorm + ``` + +=== "Python" + + ```python + from sedona.spark.stats.autocorrelation.moran import Moran + from sedona.spark.stats.weighting import add_binary_distance_band_column + + result = add_binary_distance_band_column( + df, + 1.0, + saved_attributes=["id", "value"] + ) + + moran_i_result = Moran.get_global(result) + + ## result fields + moran_i_result.p_norm + moran_i_result.i + moran_i_result.z_norm + ``` + +In the result you get the Z norm, P norm and Moran I value. + +The full signatures of the functions + + +=== "Scala" + + ```scala + def getGlobal( + dataframe: DataFrame, + twoTailed: Boolean = true, + idColumn: String = ID_COLUMN, + valueColumnName: String = VALUE_COLUMN): MoranResult + + // java interface + public interface MoranResult { + public double getI(); + public double getPNorm(); + public double getZNorm(); + } + ``` + +=== "Python" + + ```python + def get_global( + df: DataFrame, + two_tailed: bool = True, + id_column: str = "id", + value_column: str = "value", + ) -> MoranResult + + @dataclass + class MoranResult: + i: float + p_norm: float + z_norm: float + ``` diff --git a/docs/image/moranI.png b/docs/image/moranI.png new file mode 100644 index 0000000000000000000000000000000000000000..157c2e36e67d3652474ef8075bb67fabec534465 GIT binary patch literal 5422 zcmb_fWl$7s+g(DsRYF2Rkx-fymPSBWU}>bAWoZPKZj>(RlvZ*{5m*ESq+=DyrKG!? zC0^d=ndkfW{dpgM+;e8`x#rAuU+0{e`-)IgktZd3Km-5)NWluS8UVnZ1^@udmjLJX zM&vVB9sochXd@$|W@BLi0C=JisUBLIlhj?OTwsMqp8?n!FP(zJ;f3#hNPAPtvOd9j z8NtU~Jn_@))tfDrV5qFTGy@!tYgv^SE5gt2-pyZ~%g;|1YpydJ1h_bAmYmghs&w1A zS{B9l!g5@9jf7~VWZDuH4xAx@*v@%^5`yo=Ie;;cO+0MfZvel&m3=6cioN~4Tw1XJ zN}dV;9y_-PnP=M#wLW%UGM_JXjpmu4#FX)cxF7~fgbtBY`-5ObB`syMY59JZcZ z*k)=~F588XYFSCSBVa$Undw)T=QcdOaWM~uaYD}XaFCz{Sv~dky@~vna97T&UJbCB zeSVEf4QDD1Wm$RHG_m@(kVmF(8S0eN$QmDQtl4Z}ov5i2>`4(#KEjp%$j{FLt&)$9 zB~&8`V5SPTw>11M#iJ4V{`y+LP3mrpbzi|_OAZX4(ZO%51?=#5zDEWUYfqR;9?|VQ zmOGM@6B{|RCOt3CyS^!pD}H#NOd~(^-uYg%eayVFEiW+q!7I`{bbM&x!*!Wgc@_X$ z?n1V8;Mdr=^CbKvl|G|BXk@U9Uq-R)g7YxQ&=a=9UsueC_#%2bg$ovsCFPCy z(BBHkd@tcb$&5+TlIc98dQ8stJnY$-XGrMb+7LWTB9cj}{>rxy*^HdxCEqzN!W5JP zT|HanInP}Dci5VEq*i-S0ufH(SKm1nXq`i?Su+A549po3?h6zP@uH4<8&+`x)RMZY zOiSc*0w%`$Z6A_AUYKzN2umz@Yep`R*+xS_bBt9@0 zo8$+8kr!~c3SjV=iRKOdLxMol@IYFHcUozYrKlimtsCVu;;7tveP*oXZ#gHhr*f(N z1O0yBBHobr2i3k|;_e_-lcEs|HTJ}E;v*?AQ;rUyGyRlyezAk4< z5Y)n4Id-s<+G2Uaww!1xa@0)H9KYIPg0B2-Q|wF|vO1tjNI0l$UZjD~F($54+?k9i zf^9)_&)yf$IGU#|burnKw1wX3d)z#1zrO)HAo`S2s+h|A9o@WwCzT;fsKVnK0uH_$ z_Q#yz5AuVorN_@%@*a7Ik3UJajJF@AoM|(&r0QjZy+7=QTHe=FdXd1^6Sc%xlX@v- zrV-6fcF##s7)r&O$)Bj9SdzPy%aEHTi_)N3zLTstT7Xs)|52N!IQVfWkGsGzeuTs18Xp7MJXz8-#X;81{x3Th^9o}(Wlu?jx3EViHuSVSImgi{V4nKB{a3q zESZ>-TzhsHxc(aM;%6o>Otk%PH{qImiE*_63Q_E0)Kp)#2*B)}~pO)0SeI z+S)*!V~rTi`z7332(6hiZ~f!g4LhI%&<9wdkIXQ1Xhf^j71)lCX7i;h+gSkwV)S|p zhsrIOV6#>&8n6=4lCU!35t?G1GNDovwSau-@v7n9*2<&oW5quvDxFmdCLX>}tlA zs=)Oe{UazXOXS#5_?REBR(CbZvieB^QFV!m5*Jxz#MSM&PqybV5X&!_a zd_uHtlT1CBQgYW4Rc#V$g1E14JNpP>7%;XNSF(n;O>cY55@l9t`)Diq7WjY`6(Xe| z7ltT?O~zf}b&wn=Sr`Zsr?UgSaM&uXF6#1swk zZ?$Mo=y3IX{A`Z2ihu%li@>v|dU46c-8CCm1YZ${9Jgi3+jZ4B4v$hgPZ}!C*oJNc zw`sT4%M8fW$ZW`X%gnr7eaA#v@J=oiLDclXkzt6ul6W^}FFcLLnMH$(pSgzfz%j`$ zNw5^kBEe_AkP#tNfXMIs3E`u-%T)Vx<#jn^D|#YlvDmrVdDNMFp}P|mBboe7F)gL) z*}R;&=|bdpdhi$ULE#t6n5Cm7HKzm=_R3>heb>opZH>g90KrIAG2EMbcq_1RECbH(q%XD)uy7B zD_13{aS6tVkRdJYqwQgOEqZ0OOi+wfW$(_=KB}Z=ozhuOJHM#WtkJ;5WTkUydtei# z+B>J2tMOj_jaoU^(A}IgM}EJtBbA7^gUv5;c(&{N{T2jd_#q%}B9y6%~lYCCbaqUkA3X?2ysF<9VbKMhjGpyie z*GGC2M=E<{70A@JgL;u`*#3Zpt?O!A_*(}q2`+qp_X{srnV#J-^Nh?h zG66Ck&~!lInZlfU@6T+D$?lyxh+UJj@9@47+ky%_D_YF!BK5sVY|cWKhzt4@zDc($ zfFE#tS+`z}ne1|}Kf!5jymHU&XfD{#>TX;)G+s3jVS_WZ{0^9(=**33H6&_mmeg;S9NFAR_Gir7 zC;^fNWc%uB02X54?Gpkoa{Jpgg7A%ko+|)A&HCSi1=e`7ce`N(swr#9-Q3)yq@>)I zU}IxjTU)<-_bx3hjf{*eC@6@Bhet_CiHL|uUS8hM&rd`|1Ox&tE-p?^PU`CF8Wgt-B8Y?R+7!1b9$cT%J`{cig2CXQKYw<2cMAy#Sz20B zQ&ZE?(Ge39KYjYt(M)?60KmTomX+4>L~o({ZH={QyXQ}RTpi7;__zpcq~-7Dz0@!D zl*pejM?YesAWkqHwSNBspBG%zjMJ7byhrx+VOu=>_nq*y39oD4xlbrptjx|?pDSIv z>f4=CL7$$r`QTuHef2zfMtx|fFRGw0?$WntEoK~5xs)~8z^8ao6}nNUBM;uxc_pnw zmai&#GtB-egAb%Wr3s&qaW|Byb6|;+5BY%0Sk#@ZmAGcFHkvBk8rnMc@@T!|SZVb2 zkUpHMg^m$}uT2aZHIq*k7}v|u9%_@VXn=_n*|xf-_>{Puco(^R!vDIEz1`BxSc|^$ z>+~JII(Hu^nn7hnrGM$gqfI#>0=%OK%;QNlVErkiaQvI^58ylc+rDmj{ypBdbj#B& zVs9e>tFEmWT>UyeZg=ptHB+;$HKTS~$z*Bk7!l|C;=+n_)c*AK(5qA5A@9?11HN6j zNi&1yVA^zzt*~5}xTPjgYxC)Wt}rKEX292AO+h1>6DQKq9RKF(wbIUxtX)pKYu;5A zsJKodY-K8|dlONR)mka(F1i|)(UkTJSeoap>&CHa7_!aK_0`C2elo416d!c|J`<*3 ztvjofuUu5t1RMil@CD=aS-MBLb@|+F5>nboHQgaz_9( zj?@Vj+1hAM^KE#>#J7AJk^9*Wbah5 zaay6D(tkrx{hgV=jQO{*{>si-rBRb!9TriC@b!ss`1aI)j_|*#{tqhrfocDv+CNkO_xOJX4N#msbVFg_ zjO9KpE4}tP^l}1oJ4c2+jTmvA+KZ9H=b4k*MG5%g{Y(1leHE2F9 zm6Ff1E7x%7f3(v)5;n@5l-P~TR{7NpJ#Bdb(HpElCOr2h(>O8LtW`)6Pt3~6u3r}s z6+az5w)leTJu=MTS#M11Sl3$`INI9YJ8=VfHJG>xc%7H7xE)y8O0apH+MPU~1o$Td z{)*Dye*1$j|MwC7AI$LoMfo3-?CCVLo+@jfZ*~7%&rbIm`SsyW;t9D=y{(m%|0o>i fz}3oe{1Q(oBponmJ;iYQI{|>@RAeh&ng;&|5VqIX literal 0 HcmV?d00001 diff --git a/python/sedona/spark/stats/autocorrelation/moran.py b/python/sedona/spark/stats/autocorrelation/moran.py index 9b7717a0f7f..fb008fa5d50 100644 --- a/python/sedona/spark/stats/autocorrelation/moran.py +++ b/python/sedona/spark/stats/autocorrelation/moran.py @@ -31,14 +31,17 @@ class Moran: @staticmethod def get_global( - df: DataFrame, two_tailed: bool = True, id_column: str = "id" + df: DataFrame, + two_tailed: bool = True, + id_column: str = "id", + value_column: str = "value", ) -> MoranResult: sedona = SparkSession.getActiveSession() _jvm = sedona._jvm moran_result = ( sedona._jvm.org.apache.sedona.stats.autocorelation.Moran.getGlobal( - df._jdf, two_tailed, id_column + df._jdf, two_tailed, id_column, value_column ) ) diff --git a/python/sedona/spark/stats/weighting.py b/python/sedona/spark/stats/weighting.py index 8475d0b4bea..d12639255e1 100644 --- a/python/sedona/spark/stats/weighting.py +++ b/python/sedona/spark/stats/weighting.py @@ -61,7 +61,7 @@ def add_distance_band_column( """ sedona = SparkSession.getActiveSession() return DataFrame( - sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumn( + sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumnPython( dataframe._jdf, float(threshold), binary, @@ -114,11 +114,14 @@ def add_binary_distance_band_column( sedona = SparkSession.getActiveSession() return DataFrame( - sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn( + sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumnPython( dataframe._jdf, float(threshold), + True, + float(-1.0), include_zero_distance_neighbors, include_self, + float(1.0), geometry, use_spheroid, saved_attributes, @@ -168,13 +171,14 @@ def add_weighted_distance_band_column( sedona = SparkSession.getActiveSession() return DataFrame( - sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn( + sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumnPython( dataframe._jdf, float(threshold), - float(alpha), + False, + alpha, include_zero_distance_neighbors, include_self, - float(self_weight), + self_weight, geometry, use_spheroid, saved_attributes, diff --git a/python/tests/stats/test_moran.py b/python/tests/stats/test_moran.py index 84b13ee379d..d910d79f1cf 100644 --- a/python/tests/stats/test_moran.py +++ b/python/tests/stats/test_moran.py @@ -15,49 +15,81 @@ # specific language governing permissions and limitations # under the License. -from sedona.spark.stats.autocorrelation.moran import Moran, MoranResult +from sedona.spark.stats.autocorrelation.moran import Moran from sedona.spark.stats.weighting import add_binary_distance_band_column from tests.test_base import TestBase class TestMoran(TestBase): - def test_moran_integration(self): - data = [ - (1, 1.0, 1.0, 8.5), - (2, 1.5, 1.2, 8.2), - (3, 1.3, 1.8, 8.7), - (4, 1.7, 1.6, 7.9), - (5, 4.0, 1.5, 6.2), - (6, 4.2, 1.7, 6.5), - (7, 4.5, 1.3, 5.9), - (8, 4.7, 1.8, 6.0), - (9, 1.8, 4.3, 3.1), - (10, 1.5, 4.5, 3.4), - (11, 1.2, 4.7, 3.0), - (12, 1.6, 4.2, 3.3), - (13, 1.9, 4.8, 2.8), - (14, 4.3, 4.2, 1.2), - (15, 4.5, 4.5, 1.5), - (16, 4.7, 4.8, 1.0), - (17, 4.1, 4.6, 1.3), - (18, 4.8, 4.3, 1.1), - (19, 4.2, 4.9, 1.4), - (20, 4.6, 4.1, 1.6), - ] + data = [ + (1, 1.0, 1.0, 8.5), + (2, 1.5, 1.2, 8.2), + (3, 1.3, 1.8, 8.7), + (4, 1.7, 1.6, 7.9), + (5, 4.0, 1.5, 6.2), + (6, 4.2, 1.7, 6.5), + (7, 4.5, 1.3, 5.9), + (8, 4.7, 1.8, 6.0), + (9, 1.8, 4.3, 3.1), + (10, 1.5, 4.5, 3.4), + (11, 1.2, 4.7, 3.0), + (12, 1.6, 4.2, 3.3), + (13, 1.9, 4.8, 2.8), + (14, 4.3, 4.2, 1.2), + (15, 4.5, 4.5, 1.5), + (16, 4.7, 4.8, 1.0), + (17, 4.1, 4.6, 1.3), + (18, 4.8, 4.3, 1.1), + (19, 4.2, 4.9, 1.4), + (20, 4.6, 4.1, 1.6), + ] + def test_moran_integration(self): df = ( - self.spark.createDataFrame(data) + self.spark.createDataFrame(self.data) .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value") .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value") ) - result = add_binary_distance_band_column(df, 1.0) + result = add_binary_distance_band_column( + df, 1.0, saved_attributes=["id", "value"] + ) + + moran_i_result = Moran.get_global(result) - assert Moran.get_global(result) == MoranResult( - i=0.9614304631460562, p_norm=9.103828801926284e-15, z_norm=7.752321966127421 + assert 0 < moran_i_result.p_norm < 0.00001 + assert 0.9614 < moran_i_result.i < 0.9615 + assert 7.7523 < moran_i_result.z_norm < 7.7524 + + two_tailed_result = Moran.get_global(result, False) + assert 0 < two_tailed_result.p_norm < 0.00001 + assert 0.9614 < two_tailed_result.i < 0.9615 + assert 7.7523 < two_tailed_result.z_norm < 7.7524 + + def test_moran_with_different_column_names(self): + df = ( + self.spark.createDataFrame(self.data) + .selectExpr("_1 as index", "_2 AS x", "_3 AS y", "_4 AS feature_value") + .selectExpr("index", "ST_MakePoint(x, y) AS geometry", "feature_value") ) - assert Moran.get_global(result, False) == MoranResult( - i=0.9614304631460562, p_norm=4.551914400963142e-15, z_norm=7.752321966127421 + result = add_binary_distance_band_column( + df, threshold=1.0, saved_attributes=["index", "feature_value"] ) + + moran_i_result = Moran.get_global( + result, id_column="index", value_column="feature_value" + ) + + assert 0 < moran_i_result.p_norm < 0.00001 + assert 0.9614 < moran_i_result.i < 0.9615 + assert 7.7523 < moran_i_result.z_norm < 7.7524 + + two_tailed_result = Moran.get_global( + result, id_column="index", value_column="feature_value", two_tailed=False + ) + + assert 0 < two_tailed_result.p_norm < 0.00001 + assert 0.9614 < two_tailed_result.i < 0.9615 + assert 7.7523 < two_tailed_result.z_norm < 7.7524 diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala index 77136742618..695c31f0b06 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala @@ -23,6 +23,8 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.sedona_sql.expressions.st_functions.{ST_Distance, ST_DistanceSpheroid} import org.apache.spark.sql.{Column, DataFrame} +import scala.collection.mutable + object Weighting { private val ID_COLUMN = "__id" @@ -180,6 +182,7 @@ object Weighting { * The input DataFrame with a weight column added containing neighbors and their weights * (always 1) added to each row. */ + def addBinaryDistanceBandColumn( dataframe: DataFrame, threshold: Double, @@ -255,4 +258,36 @@ object Weighting { savedAttributes = savedAttributes, resultName = resultName) + def addDistanceBandColumnPython( + dataframe: DataFrame, + threshold: Double, + binary: Boolean = true, + alpha: Double = -1.0, + includeZeroDistanceNeighbors: Boolean = false, + includeSelf: Boolean = false, + selfWeight: Double = 1.0, + geometry: String = null, + useSpheroid: Boolean = false, + savedAttributes: java.util.ArrayList[String] = null, + resultName: String = "weights"): DataFrame = { + + val savedAttributesScala = if (savedAttributes != null) { + Seq(savedAttributes.toArray: _*).map { s => + s.asInstanceOf[String] + } + } else Seq() + + addDistanceBandColumn( + dataframe, + threshold, + binary, + alpha, + includeZeroDistanceNeighbors, + includeSelf, + selfWeight, + geometry, + useSpheroid, + savedAttributesScala, + resultName) + } } diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala b/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala index 7e06a069ddc..7e92e9e4d42 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.functions.{col, explode, pow} object Moran { private val ID_COLUMN = "id" + private val VALUE_COLUMN = "value" private def normSf(x: Double, mean: Double = 0.0, stdDev: Double = 1.0): Double = { val normalDist = new NormalDistribution(mean, stdDev) @@ -39,12 +40,15 @@ object Moran { def getGlobal( dataframe: DataFrame, twoTailed: Boolean = true, - idColumn: String = ID_COLUMN): MoranResult = { + idColumn: String = ID_COLUMN, + valueColumnName: String = VALUE_COLUMN): MoranResult = { val spark = dataframe.sparkSession import spark.implicits._ + dataframe.printSchema() + val data = dataframe - .selectExpr("avg(value)", "count(*)") + .selectExpr(s"avg($valueColumnName)", "count(*)") .as[(Double, Long)] .head() @@ -56,7 +60,7 @@ object Moran { .select(col(idColumn), explode(col("weights")).alias("col")) .select( $"$idColumn".alias("id"), - $"col.neighbor.id".alias("n_id"), + $"col.neighbor.$idColumn".alias("n_id"), $"col.value".alias("weight_value")) val s1Data = explodedWeights @@ -83,9 +87,9 @@ object Moran { val inumData = dataframe .selectExpr( s"$idColumn AS id", - "value", - f"value - ${yMean} AS z", - f"transform(weights, w -> struct(w.neighbor.id AS id, w.value AS w, w.neighbor.value, w.neighbor.value - ${yMean} AS z)) AS weight") + s"$valueColumnName AS value", + f"$valueColumnName - ${yMean} AS z", + f"transform(weights, w -> struct(w.neighbor.$idColumn AS id, w.value AS w, w.neighbor.$valueColumnName, w.neighbor.$valueColumnName - ${yMean} AS z)) AS weight") .selectExpr( "z", "AGGREGATE(transform(weight, x-> x.z*x.w), CAST(0.0 AS DOUBLE), (acc, x) -> acc + x) AS ZL", diff --git a/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/MoranTest.scala b/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/MoranTest.scala index c350959f95c..367803a94ef 100644 --- a/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/MoranTest.scala +++ b/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/MoranTest.scala @@ -43,6 +43,26 @@ class MoranTest extends TestBaseScala with AutoCorrelationFixtures { assert(moranResult.getI > 0.99) } + it("different id and value column names") { + val weights = Weighting + .addDistanceBandColumn( + positiveCorrelationFrame + .selectExpr("id AS index", "value as feature_value", "geometry"), + 1.0, + savedAttributes = Seq("index", "feature_value")) + .withColumn( + "weights", + expr("transform(weights, w -> struct(w.neighbor, w.value/size(weights) AS value))")) + + weights.cache().count() + + val moranResult = + Moran.getGlobal(weights, idColumn = "index", valueColumnName = "feature_value") + + assert(moranResult.getPNorm < 0.0001) + assert(moranResult.getI > 0.99) + } + it("correlation is negative") { val weights = Weighting .addDistanceBandColumn( From 766372b39578881819c274525fade91c9f62a9fd Mon Sep 17 00:00:00 2001 From: pawelkocinski Date: Thu, 5 Jun 2025 23:30:24 +0200 Subject: [PATCH 04/10] SEDONA-738 Fix unit tests. --- docs/api/stats/sql.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/api/stats/sql.md b/docs/api/stats/sql.md index 1fcf8997818..8be7d57462f 100644 --- a/docs/api/stats/sql.md +++ b/docs/api/stats/sql.md @@ -217,7 +217,6 @@ In the result you get the Z norm, P norm and Moran I value. The full signatures of the functions - === "Scala" ```scala From 89077d40fb1a05d83f21e41e7c49b7e0b845c96b Mon Sep 17 00:00:00 2001 From: pawelkocinski Date: Fri, 6 Jun 2025 00:03:16 +0200 Subject: [PATCH 05/10] SEDONA-738 Fix unit tests. --- .../src/main/scala/org/apache/sedona/stats/Weighting.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala index 695c31f0b06..3dcdfd13f75 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala @@ -23,8 +23,6 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.sedona_sql.expressions.st_functions.{ST_Distance, ST_DistanceSpheroid} import org.apache.spark.sql.{Column, DataFrame} -import scala.collection.mutable - object Weighting { private val ID_COLUMN = "__id" @@ -182,7 +180,6 @@ object Weighting { * The input DataFrame with a weight column added containing neighbors and their weights * (always 1) added to each row. */ - def addBinaryDistanceBandColumn( dataframe: DataFrame, threshold: Double, @@ -275,7 +272,7 @@ object Weighting { Seq(savedAttributes.toArray: _*).map { s => s.asInstanceOf[String] } - } else Seq() + } else null addDistanceBandColumn( dataframe, From ade873d52a243b8ad1c3927b2038d4cbf050e556 Mon Sep 17 00:00:00 2001 From: pawelkocinski Date: Fri, 6 Jun 2025 00:08:01 +0200 Subject: [PATCH 06/10] SEDONA-738 Fix unit tests. --- .../main/scala/org/apache/sedona/stats/Weighting.scala | 9 ++++----- .../org/apache/sedona/stats/autocorelation/Moran.scala | 2 -- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala index 3dcdfd13f75..49f78633150 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala @@ -22,6 +22,7 @@ import org.apache.sedona.util.DfUtils.getGeometryColumnName import org.apache.spark.sql.functions._ import org.apache.spark.sql.sedona_sql.expressions.st_functions.{ST_Distance, ST_DistanceSpheroid} import org.apache.spark.sql.{Column, DataFrame} +import scala.collection.JavaConverters._ object Weighting { @@ -268,11 +269,9 @@ object Weighting { savedAttributes: java.util.ArrayList[String] = null, resultName: String = "weights"): DataFrame = { - val savedAttributesScala = if (savedAttributes != null) { - Seq(savedAttributes.toArray: _*).map { s => - s.asInstanceOf[String] - } - } else null + val savedAttributesScala = + if (savedAttributes != null) savedAttributes.asScala + else null addDistanceBandColumn( dataframe, diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala b/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala index 7e92e9e4d42..cf68324eb7a 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala @@ -45,8 +45,6 @@ object Moran { val spark = dataframe.sparkSession import spark.implicits._ - dataframe.printSchema() - val data = dataframe .selectExpr(s"avg($valueColumnName)", "count(*)") .as[(Double, Long)] From 45a21cc632579215a6d293f9db3c7492153f9097 Mon Sep 17 00:00:00 2001 From: pawelkocinski Date: Fri, 6 Jun 2025 16:56:30 +0200 Subject: [PATCH 07/10] SEDONA-738 Fix scala 2.13 issue --- .../src/main/scala/org/apache/sedona/stats/Weighting.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala index 49f78633150..d404f2c2dbc 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala @@ -270,7 +270,7 @@ object Weighting { resultName: String = "weights"): DataFrame = { val savedAttributesScala = - if (savedAttributes != null) savedAttributes.asScala + if (savedAttributes != null) savedAttributes.asScala.toSeq else null addDistanceBandColumn( From 848ec10bd6b89d64c4983d5e75c64a94c59e70f3 Mon Sep 17 00:00:00 2001 From: Jia Yu Date: Thu, 17 Jul 2025 08:21:59 -0700 Subject: [PATCH 08/10] Update spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../sedona/stats/autocorellation/AutoCorrelationFixtures.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala b/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala index c87ae430f1e..5209bc7d3f0 100644 --- a/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala +++ b/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sedona.stats.autocorellation +package org.apache.sedona.stats.autocorrelation import org.apache.sedona.spark.SedonaContext import org.apache.sedona.sql.TestBaseScala From 3b96dd85f2559b3c664d8e23346ce42b44d72b52 Mon Sep 17 00:00:00 2001 From: Jia Yu Date: Thu, 17 Jul 2025 08:41:42 -0700 Subject: [PATCH 09/10] Fix typos --- python/sedona/spark/register/java_libs.py | 2 +- python/sedona/spark/stats/autocorrelation/moran.py | 2 +- .../stats/{autocorelation => autocorrelation}/Moran.scala | 2 +- .../AutoCorrelationFixtures.scala | 0 .../{autocorellation => autocorrelation}/MoranTest.scala | 4 ++-- 5 files changed, 5 insertions(+), 5 deletions(-) rename spark/common/src/main/scala/org/apache/sedona/stats/{autocorelation => autocorrelation}/Moran.scala (98%) rename spark/common/src/test/scala/org/apache/sedona/stats/{autocorellation => autocorrelation}/AutoCorrelationFixtures.scala (100%) rename spark/common/src/test/scala/org/apache/sedona/stats/{autocorellation => autocorrelation}/MoranTest.scala (97%) diff --git a/python/sedona/spark/register/java_libs.py b/python/sedona/spark/register/java_libs.py index d9f76831b4b..8ba82f2c3fa 100644 --- a/python/sedona/spark/register/java_libs.py +++ b/python/sedona/spark/register/java_libs.py @@ -65,7 +65,7 @@ class SedonaJvmLib(Enum): st_predicates = "org.apache.spark.sql.sedona_sql.expressions.st_predicates" st_aggregates = "org.apache.spark.sql.sedona_sql.expressions.st_aggregates" SedonaContext = "org.apache.sedona.spark.SedonaContext" - Moran = "org.apache.sedona.stats.autocorelation.Moran" + Moran = "org.apache.sedona.stats.autocorrelation.Moran" @classmethod def from_str(cls, geo_lib: str) -> "SedonaJvmLib": diff --git a/python/sedona/spark/stats/autocorrelation/moran.py b/python/sedona/spark/stats/autocorrelation/moran.py index fb008fa5d50..506aed12b9a 100644 --- a/python/sedona/spark/stats/autocorrelation/moran.py +++ b/python/sedona/spark/stats/autocorrelation/moran.py @@ -40,7 +40,7 @@ def get_global( _jvm = sedona._jvm moran_result = ( - sedona._jvm.org.apache.sedona.stats.autocorelation.Moran.getGlobal( + sedona._jvm.org.apache.sedona.stats.autocorrelation.Moran.getGlobal( df._jdf, two_tailed, id_column, value_column ) ) diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala b/spark/common/src/main/scala/org/apache/sedona/stats/autocorrelation/Moran.scala similarity index 98% rename from spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala rename to spark/common/src/main/scala/org/apache/sedona/stats/autocorrelation/Moran.scala index cf68324eb7a..8e38cfdd0b0 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/autocorrelation/Moran.scala @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sedona.stats.autocorelation +package org.apache.sedona.stats.autocorrelation import org.apache.commons.math3.distribution.NormalDistribution import org.apache.sedona.stats.autocorrelation.MoranResult diff --git a/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/AutoCorrelationFixtures.scala similarity index 100% rename from spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala rename to spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/AutoCorrelationFixtures.scala diff --git a/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/MoranTest.scala b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/MoranTest.scala similarity index 97% rename from spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/MoranTest.scala rename to spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/MoranTest.scala index 367803a94ef..f26f79197b2 100644 --- a/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/MoranTest.scala +++ b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/MoranTest.scala @@ -16,11 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sedona.stats.autocorellation +package org.apache.sedona.stats.autocorrelation import org.apache.sedona.sql.TestBaseScala import org.apache.sedona.stats.Weighting -import org.apache.sedona.stats.autocorelation.Moran +import org.apache.sedona.stats.autocorrelation.Moran import org.apache.spark.sql.functions.expr class MoranTest extends TestBaseScala with AutoCorrelationFixtures { From 804c9e610957804eb6f9065540c76a1dbe472e60 Mon Sep 17 00:00:00 2001 From: Jia Yu Date: Thu, 17 Jul 2025 10:56:16 -0700 Subject: [PATCH 10/10] Update doc --- docs/api/stats/sql.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/docs/api/stats/sql.md b/docs/api/stats/sql.md index 8be7d57462f..005e647a9fa 100644 --- a/docs/api/stats/sql.md +++ b/docs/api/stats/sql.md @@ -136,7 +136,7 @@ names in parentheses are python variable names In both cases the output is the input DataFrame with the weights column added to each row. -# MoranI +## Moran I Moran I is the spatial autocorrelation algorithm, which is using spatial location and non-spatial attribute. When the value is close to the 1 it @@ -173,8 +173,7 @@ function is: You can manipulate the value column name and id using function parameters. -To use the Apache Sedona weight functions you need to pass the -id column and value column to kept parameters. +To use the [Apache Sedona weight functions](#adddistancebandcolumn) you need to pass the id column and value column to kept parameters. === "Scala"