Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 72 additions & 74 deletions python/sedona/geopandas/geodataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
from pyspark.pandas import Series as PandasOnSparkSeries

from pyspark.pandas.internal import InternalFrame


class GeoDataFrame(GeoFrame, pspd.DataFrame):
"""
Expand Down Expand Up @@ -136,16 +138,29 @@ def __init__(
self._col_label: Label

from sedona.geopandas import GeoSeries
from pyspark.sql import DataFrame as SparkDataFrame

if isinstance(
data, (GeoDataFrame, GeoSeries, PandasOnSparkSeries, PandasOnSparkDataFrame)
):
if isinstance(data, (GeoDataFrame, GeoSeries)):
assert dtype is None
assert not copy

self._anchor = data
self._col_label = index
elif isinstance(data, (PandasOnSparkSeries, PandasOnSparkDataFrame)):
assert columns is None
assert dtype is None
assert not copy
if index is None:
internal = InternalFrame(spark_frame=data._internal.spark_frame)
object.__setattr__(self, "_internal_frame", internal)
elif isinstance(data, SparkDataFrame):
assert columns is None
assert dtype is None
assert not copy
if index is None:
internal = InternalFrame(spark_frame=data, index_spark_columns=None)
object.__setattr__(self, "_internal_frame", internal)
else:
# below are not distributed dataframe types
if isinstance(data, pd.DataFrame):
assert index is None
assert dtype is None
Expand Down Expand Up @@ -184,6 +199,55 @@ def _reduce_for_geostat_function(
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")

def _process_geometry_columns(
self, operation: str, rename_suffix: str = "", *args, **kwargs
) -> GeoDataFrame:
"""
Helper method to process geometry columns with a specified operation.

Parameters
----------
operation : str
The spatial operation to apply (e.g., 'ST_Area', 'ST_Buffer').
rename_suffix : str, default ""
Suffix to append to the resulting column name.
args : tuple
Positional arguments for the operation.
kwargs : dict
Keyword arguments for the operation.

Returns
-------
GeoDataFrame
A new GeoDataFrame with the operation applied to geometry columns.
"""
select_expressions = []

for field in self._internal.spark_frame.schema.fields:
col_name = field.name

# Skip index and order columns
if col_name in ("__index_level_0__", "__natural_order__"):
continue

if field.dataType.typeName() in ("geometrytype", "binary"):
# Prepare arguments for the operation
positional_params = ", ".join([repr(v) for v in args])
keyword_params = ", ".join([repr(v) for v in kwargs.values()])
params = ", ".join(filter(None, [positional_params, keyword_params]))

if field.dataType.typeName() == "binary":
expr = f"{operation}(ST_GeomFromWKB(`{col_name}`){', ' + params if params else ''}) as {col_name}{rename_suffix}"
else:
expr = f"{operation}(`{col_name}`{', ' + params if params else ''}) as {col_name}{rename_suffix}"
select_expressions.append(expr)
else:
# Keep non-geometry columns as they are
select_expressions.append(f"`{col_name}`")

sdf = self._internal.spark_frame.selectExpr(*select_expressions)
return GeoDataFrame(sdf)

@property
def dtypes(self) -> gpd.GeoSeries | pd.Series | Dtype:
# Implementation of the abstract method
Expand Down Expand Up @@ -256,42 +320,7 @@ def area(self) -> GeoDataFrame:
0 1.0 1
1 4.0 2
"""
# Create a list of all column expressions for the new dataframe
select_expressions = []

# Process geometry columns to calculate areas
for field in self._internal.spark_frame.schema.fields:
col_name = field.name

# Skip index column to avoid duplication
if col_name == "__index_level_0__" or col_name == "__natural_order__":
continue

if (
field.dataType.typeName() == "geometrytype"
or field.dataType.typeName() == "binary"
):
# Calculate the area for each geometry column
if field.dataType.typeName() == "binary":
area_expr = (
f"ST_Area(ST_GeomFromWKB(`{col_name}`)) as {col_name}_area"
)
else:
area_expr = f"ST_Area(`{col_name}`) as {col_name}_area"
select_expressions.append(area_expr)
else:
# Keep non-geometry columns as they are
select_expressions.append(f"`{col_name}`")

# Execute the query to get all data in one go
result_df = self._internal.spark_frame.selectExpr(*select_expressions)

# Convert to pandas DataFrame
pandas_df = result_df.toPandas()

# Create a new GeoDataFrame with the result
# Note: This avoids the need to manipulate the index columns separately
return GeoDataFrame(pandas_df)
return self._process_geometry_columns("ST_Area", rename_suffix="_area")

@property
def crs(self):
Expand Down Expand Up @@ -553,40 +582,9 @@ def buffer(
>>> gdf = GeoDataFrame(data)
>>> buffered = gdf.buffer(0.5)
"""
# Create a list of all column expressions for the new dataframe
select_expressions = []

# Process each field in the schema
for field in self._internal.spark_frame.schema.fields:
col_name = field.name

# Skip index and order columns
if col_name == "__index_level_0__" or col_name == "__natural_order__":
continue

# Apply buffer to geometry columns
if (
field.dataType.typeName() == "geometrytype"
or field.dataType.typeName() == "binary"
):

if field.dataType.typeName() == "binary":
# For binary geometry columns (WKB)
buffer_expr = f"ST_Buffer(ST_GeomFromWKB(`{col_name}`), {distance}) as {col_name}"
else:
# For native geometry columns
buffer_expr = f"ST_Buffer(`{col_name}`, {distance}) as {col_name}"
select_expressions.append(buffer_expr)
else:
# Keep non-geometry columns as they are
select_expressions.append(f"`{col_name}`")

# Execute the query to get all data in one go
result_df = self._internal.spark_frame.selectExpr(*select_expressions)

# Convert to pandas DataFrame and create a new GeoDataFrame
pandas_df = result_df.toPandas()
return GeoDataFrame(pandas_df)
return self._process_geometry_columns(
"ST_Buffer", rename_suffix="_buffered", distance=distance
)

def sjoin(
self,
Expand Down
8 changes: 5 additions & 3 deletions python/tests/geopandas/test_geodataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,19 @@ def test_buffer(self):
assert type(buffer_df) is GeoDataFrame

# Verify the original columns are preserved
assert "geometry1" in buffer_df.columns
assert "geometry1_buffered" in buffer_df.columns
assert "id" in buffer_df.columns
assert "value" in buffer_df.columns

# Convert to pandas to extract individual geometries
pandas_df = buffer_df._internal.spark_frame.select("geometry1").toPandas()
pandas_df = buffer_df._internal.spark_frame.select(
"geometry1_buffered"
).toPandas()

# Calculate areas to verify buffer was applied correctly
# Point buffer with radius 0.5 should have area approximately π * 0.5² ≈ 0.785
# Square buffer with radius 0.5 should expand the 1x1 square to 2x2 square with rounded corners
areas = [geom.area for geom in pandas_df["geometry1"]]
areas = [geom.area for geom in pandas_df["geometry1_buffered"]]

# Check that square buffer area is greater than original (1.0)
assert areas[1] > 1.0
Expand Down
Loading