Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions src/ydata_profiling/model/spark/describe_counts_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,17 @@ def describe_counts_spark(
value_counts_index_sorted = value_counts.orderBy(F.asc(series.columns[0]))

# Count missing values
n_missing = (
value_counts.filter(F.col(series.columns[0]).isNull()).select("count").first()
)
if series.dtypes[0][1] in ("int", "float", "bigint", "double"):
n_missing = (
# Need to add the isnan() check because Pandas isnull check will count NaN as null, but Spark does not
value_counts.filter(F.col(series.columns[0]).isNull() | F.isnan(F.col(series.columns[0]))).select("count").first()
)
else:
n_missing = (
# Need to add the isnan() check because Pandas isnull check will count NaN as null, but Spark does not
value_counts.filter(F.col(series.columns[0]).isNull()).select("count").first()
)

n_missing = n_missing["count"] if n_missing else 0

# Convert top 200 values to Pandas for frequency table display
Expand All @@ -60,17 +68,15 @@ def describe_counts_spark(
value_counts.filter(F.col(column).isNotNull()) # Exclude NaNs
.filter(~F.isnan(F.col(column))) # Remove implicit NaNs (if numeric column)
.groupBy(column) # Group by unique values
.count() # Count occurrences
.agg(F.sum("count").alias("count")) # Sum of count
.orderBy(F.desc("count")) # Sort in descending order
.limit(200) # Limit for performance
)
else:
value_counts_no_nan = (
value_counts.filter(F.col(column).isNotNull()) # Exclude NULLs
.groupBy(column) # Group by unique timestamp values
.count() # Count occurrences
.agg(F.sum("count").alias("count")) # Sum of count
.orderBy(F.desc("count")) # Sort by most frequent timestamps
.limit(200) # Limit for performance
)

# Convert to Pandas Series, forcing proper structure
Expand Down
63 changes: 44 additions & 19 deletions src/ydata_profiling/model/spark/describe_numeric_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@
def numeric_stats_spark(df: DataFrame, summary: dict) -> dict:
column = df.columns[0]

# Removing null types from numeric summary stats to match Pandas defaults which skip na's (skipna=False)
finite_filter = (
F.col(column).isNotNull()
& ~F.isnan(F.col(column))
& ~F.col(column).isin([np.inf, -np.inf])
)
non_null_df = df.filter(finite_filter)

expr = [
F.mean(F.col(column)).alias("mean"),
F.stddev(F.col(column)).alias("std"),
Expand All @@ -21,7 +29,7 @@ def numeric_stats_spark(df: DataFrame, summary: dict) -> dict:
F.skewness(F.col(column)).alias("skewness"),
F.sum(F.col(column)).alias("sum"),
]
return df.agg(*expr).first().asDict()
return non_null_df.agg(*expr).first().asDict()


def describe_numeric_1d_spark(
Expand Down Expand Up @@ -81,30 +89,47 @@ def describe_numeric_1d_spark(
quantiles = config.vars.num.quantiles
quantile_threshold = 0.05

summary.update(
{
f"{percentile:.0%}": value
for percentile, value in zip(
quantiles,
df.stat.approxQuantile(
f"{df.columns[0]}",
if summary.get("n") == summary.get("n_missing"):
# This means the entire column is null/nan, so summary values need to be hard-coded:
summary.update(
{
f"{percentile:.0%}": np.nan
for percentile in quantiles
}
)

summary["mad"] = np.nan
summary["iqr"] = np.nan

else:
summary.update(
{
f"{percentile:.0%}": value
for percentile, value in zip(
quantiles,
quantile_threshold,
),
)
}
)
df.stat.approxQuantile(
f"{df.columns[0]}",
quantiles,
quantile_threshold,
),
)
}
)

median = summary["50%"]
median = summary.get("50%")

summary["mad"] = df.select(
(F.abs(F.col(f"{df.columns[0]}").cast("int") - median)).alias("abs_dev")
).stat.approxQuantile("abs_dev", [0.5], quantile_threshold)[0]
summary["mad"] = df.select(
(F.abs(F.col(f"{df.columns[0]}").cast("int") - median)).alias("abs_dev")
).stat.approxQuantile("abs_dev", [0.5], quantile_threshold)[0]

summary["iqr"] = summary["75%"] - summary["25%"]

# FIXME: move to fmt
summary["p_negative"] = summary["n_negative"] / summary["n"]
summary["range"] = summary["max"] - summary["min"]
summary["iqr"] = summary["75%"] - summary["25%"]
if summary["min"] is None or summary["max"] is None:
summary["range"] = np.nan
else:
summary["range"] = summary["max"] - summary["min"]
summary["cv"] = summary["std"] / summary["mean"] if summary["mean"] else np.nan
summary["p_zeros"] = summary["n_zeros"] / summary["n"]
summary["p_infinite"] = summary["n_infinite"] / summary["n"]
Expand Down
97 changes: 97 additions & 0 deletions tests/issues/test_issue1429.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""
Test for issue 1429:
https://github.com/ydataai/ydata-profiling/issues/1429
"""
import numpy as np

from ydata_profiling.config import SparkSettings
from ydata_profiling.model.spark.describe_numeric_spark import numeric_stats_spark, describe_numeric_1d_spark
from ydata_profiling.model.spark.describe_counts_spark import describe_counts_spark
from ydata_profiling.model.spark.describe_generic_spark import describe_generic_spark
from ydata_profiling.model.spark.describe_supported_spark import describe_supported_spark
from pyspark.sql import types as T, SparkSession, DataFrame

config = SparkSettings()

def create_test_df(spark: SparkSession) -> DataFrame:
schema = T.StructType(
[
T.StructField("category", T.StringType(), True),
T.StructField("double", T.DoubleType(), True),
T.StructField("int", T.IntegerType(), True),
T.StructField("boolean", T.BooleanType(), True),
T.StructField("null_double", T.DoubleType(), True),
]
)

data = [
(f"test_{num + 1}", float(num), int(num), True, None) for num in range(205)
]

# Adding dupes
data.extend(
[
("test_1", float(1), int(1), False, None) for _ in range(205)
]
)

# Adding nulls
data.extend(
[
(None, None, None, None, None) for _ in range(100)
]
)

return spark.createDataFrame(data, schema=schema)


def test_describe_numeric_spark(spark_session):
test_df = create_test_df(spark_session)

numeric_stats = numeric_stats_spark(df=test_df.select("double"), summary={})

for _, value in numeric_stats.items():
assert value is not None


def test_describe_numeric_1d_spark_for_null_column_edge_case(spark_session, test_output_dir):
spark = spark_session
test_df = create_test_df(spark)

_, _, summary = describe_counts_spark(config=config, series=test_df.select("null_double"), summary={})

_, _, summary = describe_generic_spark(config=config, df=test_df.select("null_double"), summary=summary)

_, _, summary = describe_supported_spark(config=config, series=test_df.select("null_double"), summary=summary)

_, _, summary = describe_numeric_1d_spark(config=config, df=test_df.select("null_double"), summary=summary)

assert summary["iqr"] is np.nan
assert summary["mad"] is np.nan
assert summary["cv"] is np.nan
assert summary["mean"] is None
assert summary["histogram"] == []


def test_describe_counts_spark(spark_session):
test_df = create_test_df(spark_session)

_, _, summary = describe_counts_spark(config=config, series=test_df.select("category"), summary={})

assert summary["value_counts_without_nan"].loc["test_1"] == 206

_, _, summary = describe_counts_spark(config=config, series=test_df.select("double"), summary={})

assert summary["value_counts_without_nan"].loc[float(1)] == 206

_, _, summary = describe_counts_spark(config=config, series=test_df.select("int"), summary={})

assert summary["value_counts_without_nan"].loc[int(1)] == 206

_, _, summary = describe_counts_spark(config=config, series=test_df.select("boolean"), summary={})

assert summary["value_counts_without_nan"].loc[True] == 205

_, _, summary = describe_counts_spark(config=config, series=test_df.select("null_double"), summary={})

assert summary["value_counts_without_nan"].size == 0