From b98268d5f5beccce819e0a9162b4587807cd2824 Mon Sep 17 00:00:00 2001 From: zhangfengcdt Date: Fri, 11 Apr 2025 17:05:15 -0700 Subject: [PATCH 1/3] [SEDONA-720] Refactor GeoPandas Dataframe implementation --- python/sedona/geopandas/geodataframe.py | 141 ++++++++++---------- python/tests/geopandas/test_geodataframe.py | 6 +- 2 files changed, 72 insertions(+), 75 deletions(-) diff --git a/python/sedona/geopandas/geodataframe.py b/python/sedona/geopandas/geodataframe.py index bdef237c1fa..dcdd077e9d9 100644 --- a/python/sedona/geopandas/geodataframe.py +++ b/python/sedona/geopandas/geodataframe.py @@ -30,6 +30,7 @@ from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame from pyspark.pandas import Series as PandasOnSparkSeries +from pyspark.pandas.internal import InternalFrame class GeoDataFrame(GeoFrame, pspd.DataFrame): """ @@ -136,16 +137,31 @@ def __init__( self._col_label: Label from sedona.geopandas import GeoSeries + from pyspark.sql import DataFrame as SparkDataFrame if isinstance( - data, (GeoDataFrame, GeoSeries, PandasOnSparkSeries, PandasOnSparkDataFrame) + data, (GeoDataFrame, GeoSeries) ): assert dtype is None assert not copy - self._anchor = data self._col_label = index + elif isinstance(data, (PandasOnSparkSeries, PandasOnSparkDataFrame)): + assert columns is None + assert dtype is None + assert not copy + if index is None: + internal = InternalFrame(spark_frame=data._internal.spark_frame) + object.__setattr__(self, "_internal_frame", internal) + elif isinstance(data, SparkDataFrame): + assert columns is None + assert dtype is None + assert not copy + if index is None: + internal = InternalFrame(spark_frame=data, index_spark_columns=None) + object.__setattr__(self, "_internal_frame", internal) else: + # below are not distributed dataframe types if isinstance(data, pd.DataFrame): assert index is None assert dtype is None @@ -184,6 +200,55 @@ def _reduce_for_geostat_function( # Implementation of the abstract method raise NotImplementedError("This method is not implemented yet.") + def _process_geometry_columns( + self, operation: str, rename_suffix: str = "", *args, **kwargs + ) -> "GeoDataFrame": + """ + Helper method to process geometry columns with a specified operation. + + Parameters + ---------- + operation : str + The spatial operation to apply (e.g., 'ST_Area', 'ST_Buffer'). + rename_suffix : str, default "" + Suffix to append to the resulting column name. + args : tuple + Positional arguments for the operation. + kwargs : dict + Keyword arguments for the operation. + + Returns + ------- + GeoDataFrame + A new GeoDataFrame with the operation applied to geometry columns. + """ + select_expressions = [] + + for field in self._internal.spark_frame.schema.fields: + col_name = field.name + + # Skip index and order columns + if col_name in ("__index_level_0__", "__natural_order__"): + continue + + if field.dataType.typeName() in ("geometrytype", "binary"): + # Prepare arguments for the operation + positional_params = ", ".join([repr(v) for v in args]) + keyword_params = ", ".join([repr(v) for v in kwargs.values()]) + params = ", ".join(filter(None, [positional_params, keyword_params])) + + if field.dataType.typeName() == "binary": + expr = f"{operation}(ST_GeomFromWKB(`{col_name}`){', ' + params if params else ''}) as {col_name}{rename_suffix}" + else: + expr = f"{operation}(`{col_name}`{', ' + params if params else ''}) as {col_name}{rename_suffix}" + select_expressions.append(expr) + else: + # Keep non-geometry columns as they are + select_expressions.append(f"`{col_name}`") + + sdf = self._internal.spark_frame.selectExpr(*select_expressions) + return GeoDataFrame(sdf) + @property def dtypes(self) -> Union[gpd.GeoSeries, pd.Series, Dtype]: # Implementation of the abstract method @@ -256,42 +321,7 @@ def area(self) -> "GeoDataFrame": 0 1.0 1 1 4.0 2 """ - # Create a list of all column expressions for the new dataframe - select_expressions = [] - - # Process geometry columns to calculate areas - for field in self._internal.spark_frame.schema.fields: - col_name = field.name - - # Skip index column to avoid duplication - if col_name == "__index_level_0__" or col_name == "__natural_order__": - continue - - if ( - field.dataType.typeName() == "geometrytype" - or field.dataType.typeName() == "binary" - ): - # Calculate the area for each geometry column - if field.dataType.typeName() == "binary": - area_expr = ( - f"ST_Area(ST_GeomFromWKB(`{col_name}`)) as {col_name}_area" - ) - else: - area_expr = f"ST_Area(`{col_name}`) as {col_name}_area" - select_expressions.append(area_expr) - else: - # Keep non-geometry columns as they are - select_expressions.append(f"`{col_name}`") - - # Execute the query to get all data in one go - result_df = self._internal.spark_frame.selectExpr(*select_expressions) - - # Convert to pandas DataFrame - pandas_df = result_df.toPandas() - - # Create a new GeoDataFrame with the result - # Note: This avoids the need to manipulate the index columns separately - return GeoDataFrame(pandas_df) + return self._process_geometry_columns("ST_Area", rename_suffix="_area") @property def crs(self): @@ -553,40 +583,7 @@ def buffer( >>> gdf = GeoDataFrame(data) >>> buffered = gdf.buffer(0.5) """ - # Create a list of all column expressions for the new dataframe - select_expressions = [] - - # Process each field in the schema - for field in self._internal.spark_frame.schema.fields: - col_name = field.name - - # Skip index and order columns - if col_name == "__index_level_0__" or col_name == "__natural_order__": - continue - - # Apply buffer to geometry columns - if ( - field.dataType.typeName() == "geometrytype" - or field.dataType.typeName() == "binary" - ): - - if field.dataType.typeName() == "binary": - # For binary geometry columns (WKB) - buffer_expr = f"ST_Buffer(ST_GeomFromWKB(`{col_name}`), {distance}) as {col_name}" - else: - # For native geometry columns - buffer_expr = f"ST_Buffer(`{col_name}`, {distance}) as {col_name}" - select_expressions.append(buffer_expr) - else: - # Keep non-geometry columns as they are - select_expressions.append(f"`{col_name}`") - - # Execute the query to get all data in one go - result_df = self._internal.spark_frame.selectExpr(*select_expressions) - - # Convert to pandas DataFrame and create a new GeoDataFrame - pandas_df = result_df.toPandas() - return GeoDataFrame(pandas_df) + return self._process_geometry_columns("ST_Buffer", rename_suffix="_buffered", distance=distance) def sjoin( self, diff --git a/python/tests/geopandas/test_geodataframe.py b/python/tests/geopandas/test_geodataframe.py index 502b5215262..9d7ff2345fd 100644 --- a/python/tests/geopandas/test_geodataframe.py +++ b/python/tests/geopandas/test_geodataframe.py @@ -162,17 +162,17 @@ def test_buffer(self): assert type(buffer_df) is GeoDataFrame # Verify the original columns are preserved - assert "geometry1" in buffer_df.columns + assert "geometry1_buffered" in buffer_df.columns assert "id" in buffer_df.columns assert "value" in buffer_df.columns # Convert to pandas to extract individual geometries - pandas_df = buffer_df._internal.spark_frame.select("geometry1").toPandas() + pandas_df = buffer_df._internal.spark_frame.select("geometry1_buffered").toPandas() # Calculate areas to verify buffer was applied correctly # Point buffer with radius 0.5 should have area approximately π * 0.5² ≈ 0.785 # Square buffer with radius 0.5 should expand the 1x1 square to 2x2 square with rounded corners - areas = [geom.area for geom in pandas_df["geometry1"]] + areas = [geom.area for geom in pandas_df["geometry1_buffered"]] # Check that square buffer area is greater than original (1.0) assert areas[1] > 1.0 From bd00aa67f0a7bbe53ad997298ca955228ce6f3df Mon Sep 17 00:00:00 2001 From: zhangfengcdt Date: Fri, 11 Apr 2025 17:19:04 -0700 Subject: [PATCH 2/3] fix pre-commit error --- python/sedona/geopandas/geodataframe.py | 11 ++++++----- python/tests/geopandas/test_geodataframe.py | 4 +++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/python/sedona/geopandas/geodataframe.py b/python/sedona/geopandas/geodataframe.py index dcdd077e9d9..d9c29dd7608 100644 --- a/python/sedona/geopandas/geodataframe.py +++ b/python/sedona/geopandas/geodataframe.py @@ -32,6 +32,7 @@ from pyspark.pandas.internal import InternalFrame + class GeoDataFrame(GeoFrame, pspd.DataFrame): """ A class representing a GeoDataFrame, inheriting from GeoFrame and pyspark.pandas.DataFrame. @@ -139,9 +140,7 @@ def __init__( from sedona.geopandas import GeoSeries from pyspark.sql import DataFrame as SparkDataFrame - if isinstance( - data, (GeoDataFrame, GeoSeries) - ): + if isinstance(data, (GeoDataFrame, GeoSeries)): assert dtype is None assert not copy self._anchor = data @@ -201,7 +200,7 @@ def _reduce_for_geostat_function( raise NotImplementedError("This method is not implemented yet.") def _process_geometry_columns( - self, operation: str, rename_suffix: str = "", *args, **kwargs + self, operation: str, rename_suffix: str = "", *args, **kwargs ) -> "GeoDataFrame": """ Helper method to process geometry columns with a specified operation. @@ -583,7 +582,9 @@ def buffer( >>> gdf = GeoDataFrame(data) >>> buffered = gdf.buffer(0.5) """ - return self._process_geometry_columns("ST_Buffer", rename_suffix="_buffered", distance=distance) + return self._process_geometry_columns( + "ST_Buffer", rename_suffix="_buffered", distance=distance + ) def sjoin( self, diff --git a/python/tests/geopandas/test_geodataframe.py b/python/tests/geopandas/test_geodataframe.py index 9d7ff2345fd..326e787adb1 100644 --- a/python/tests/geopandas/test_geodataframe.py +++ b/python/tests/geopandas/test_geodataframe.py @@ -167,7 +167,9 @@ def test_buffer(self): assert "value" in buffer_df.columns # Convert to pandas to extract individual geometries - pandas_df = buffer_df._internal.spark_frame.select("geometry1_buffered").toPandas() + pandas_df = buffer_df._internal.spark_frame.select( + "geometry1_buffered" + ).toPandas() # Calculate areas to verify buffer was applied correctly # Point buffer with radius 0.5 should have area approximately π * 0.5² ≈ 0.785 From 6240bab35c093db6b64f60d57107d6bd1a6ff430 Mon Sep 17 00:00:00 2001 From: zhangfengcdt Date: Fri, 11 Apr 2025 17:31:31 -0700 Subject: [PATCH 3/3] fix pyupgrade lint --- python/sedona/geopandas/geodataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/sedona/geopandas/geodataframe.py b/python/sedona/geopandas/geodataframe.py index c6a1426adb0..bc4354e4746 100644 --- a/python/sedona/geopandas/geodataframe.py +++ b/python/sedona/geopandas/geodataframe.py @@ -201,7 +201,7 @@ def _reduce_for_geostat_function( def _process_geometry_columns( self, operation: str, rename_suffix: str = "", *args, **kwargs - ) -> "GeoDataFrame": + ) -> GeoDataFrame: """ Helper method to process geometry columns with a specified operation.