diff --git a/src/ydata_profiling/model/spark/describe_counts_spark.py b/src/ydata_profiling/model/spark/describe_counts_spark.py index d7a091e7f..4b37fdc76 100644 --- a/src/ydata_profiling/model/spark/describe_counts_spark.py +++ b/src/ydata_profiling/model/spark/describe_counts_spark.py @@ -36,9 +36,17 @@ def describe_counts_spark( value_counts_index_sorted = value_counts.orderBy(F.asc(series.columns[0])) # Count missing values - n_missing = ( - value_counts.filter(F.col(series.columns[0]).isNull()).select("count").first() - ) + if series.dtypes[0][1] in ("int", "float", "bigint", "double"): + n_missing = ( + # Need to add the isnan() check because Pandas isnull check will count NaN as null, but Spark does not + value_counts.filter(F.col(series.columns[0]).isNull() | F.isnan(F.col(series.columns[0]))).select("count").first() + ) + else: + n_missing = ( + # Need to add the isnan() check because Pandas isnull check will count NaN as null, but Spark does not + value_counts.filter(F.col(series.columns[0]).isNull()).select("count").first() + ) + n_missing = n_missing["count"] if n_missing else 0 # Convert top 200 values to Pandas for frequency table display @@ -60,17 +68,15 @@ def describe_counts_spark( value_counts.filter(F.col(column).isNotNull()) # Exclude NaNs .filter(~F.isnan(F.col(column))) # Remove implicit NaNs (if numeric column) .groupBy(column) # Group by unique values - .count() # Count occurrences + .agg(F.sum("count").alias("count")) # Sum of count .orderBy(F.desc("count")) # Sort in descending order - .limit(200) # Limit for performance ) else: value_counts_no_nan = ( value_counts.filter(F.col(column).isNotNull()) # Exclude NULLs .groupBy(column) # Group by unique timestamp values - .count() # Count occurrences + .agg(F.sum("count").alias("count")) # Sum of count .orderBy(F.desc("count")) # Sort by most frequent timestamps - .limit(200) # Limit for performance ) # Convert to Pandas Series, forcing proper structure diff --git a/src/ydata_profiling/model/spark/describe_numeric_spark.py b/src/ydata_profiling/model/spark/describe_numeric_spark.py index 5e30c7539..0b616f91c 100644 --- a/src/ydata_profiling/model/spark/describe_numeric_spark.py +++ b/src/ydata_profiling/model/spark/describe_numeric_spark.py @@ -11,6 +11,14 @@ def numeric_stats_spark(df: DataFrame, summary: dict) -> dict: column = df.columns[0] + # Removing null types from numeric summary stats to match Pandas defaults which skip na's (skipna=False) + finite_filter = ( + F.col(column).isNotNull() + & ~F.isnan(F.col(column)) + & ~F.col(column).isin([np.inf, -np.inf]) + ) + non_null_df = df.filter(finite_filter) + expr = [ F.mean(F.col(column)).alias("mean"), F.stddev(F.col(column)).alias("std"), @@ -21,7 +29,7 @@ def numeric_stats_spark(df: DataFrame, summary: dict) -> dict: F.skewness(F.col(column)).alias("skewness"), F.sum(F.col(column)).alias("sum"), ] - return df.agg(*expr).first().asDict() + return non_null_df.agg(*expr).first().asDict() def describe_numeric_1d_spark( @@ -81,30 +89,47 @@ def describe_numeric_1d_spark( quantiles = config.vars.num.quantiles quantile_threshold = 0.05 - summary.update( - { - f"{percentile:.0%}": value - for percentile, value in zip( - quantiles, - df.stat.approxQuantile( - f"{df.columns[0]}", + if summary.get("n") == summary.get("n_missing"): + # This means the entire column is null/nan, so summary values need to be hard-coded: + summary.update( + { + f"{percentile:.0%}": np.nan + for percentile in quantiles + } + ) + + summary["mad"] = np.nan + summary["iqr"] = np.nan + + else: + summary.update( + { + f"{percentile:.0%}": value + for percentile, value in zip( quantiles, - quantile_threshold, - ), - ) - } - ) + df.stat.approxQuantile( + f"{df.columns[0]}", + quantiles, + quantile_threshold, + ), + ) + } + ) - median = summary["50%"] + median = summary.get("50%") - summary["mad"] = df.select( - (F.abs(F.col(f"{df.columns[0]}").cast("int") - median)).alias("abs_dev") - ).stat.approxQuantile("abs_dev", [0.5], quantile_threshold)[0] + summary["mad"] = df.select( + (F.abs(F.col(f"{df.columns[0]}").cast("int") - median)).alias("abs_dev") + ).stat.approxQuantile("abs_dev", [0.5], quantile_threshold)[0] + + summary["iqr"] = summary["75%"] - summary["25%"] # FIXME: move to fmt summary["p_negative"] = summary["n_negative"] / summary["n"] - summary["range"] = summary["max"] - summary["min"] - summary["iqr"] = summary["75%"] - summary["25%"] + if summary["min"] is None or summary["max"] is None: + summary["range"] = np.nan + else: + summary["range"] = summary["max"] - summary["min"] summary["cv"] = summary["std"] / summary["mean"] if summary["mean"] else np.nan summary["p_zeros"] = summary["n_zeros"] / summary["n"] summary["p_infinite"] = summary["n_infinite"] / summary["n"] diff --git a/tests/issues/test_issue1429.py b/tests/issues/test_issue1429.py new file mode 100644 index 000000000..6599931d5 --- /dev/null +++ b/tests/issues/test_issue1429.py @@ -0,0 +1,97 @@ +""" +Test for issue 1429: +https://github.com/ydataai/ydata-profiling/issues/1429 +""" +import numpy as np + +from ydata_profiling.config import SparkSettings +from ydata_profiling.model.spark.describe_numeric_spark import numeric_stats_spark, describe_numeric_1d_spark +from ydata_profiling.model.spark.describe_counts_spark import describe_counts_spark +from ydata_profiling.model.spark.describe_generic_spark import describe_generic_spark +from ydata_profiling.model.spark.describe_supported_spark import describe_supported_spark +from pyspark.sql import types as T, SparkSession, DataFrame + +config = SparkSettings() + +def create_test_df(spark: SparkSession) -> DataFrame: + schema = T.StructType( + [ + T.StructField("category", T.StringType(), True), + T.StructField("double", T.DoubleType(), True), + T.StructField("int", T.IntegerType(), True), + T.StructField("boolean", T.BooleanType(), True), + T.StructField("null_double", T.DoubleType(), True), + ] + ) + + data = [ + (f"test_{num + 1}", float(num), int(num), True, None) for num in range(205) + ] + + # Adding dupes + data.extend( + [ + ("test_1", float(1), int(1), False, None) for _ in range(205) + ] + ) + + # Adding nulls + data.extend( + [ + (None, None, None, None, None) for _ in range(100) + ] + ) + + return spark.createDataFrame(data, schema=schema) + + +def test_describe_numeric_spark(spark_session): + test_df = create_test_df(spark_session) + + numeric_stats = numeric_stats_spark(df=test_df.select("double"), summary={}) + + for _, value in numeric_stats.items(): + assert value is not None + + +def test_describe_numeric_1d_spark_for_null_column_edge_case(spark_session, test_output_dir): + spark = spark_session + test_df = create_test_df(spark) + + _, _, summary = describe_counts_spark(config=config, series=test_df.select("null_double"), summary={}) + + _, _, summary = describe_generic_spark(config=config, df=test_df.select("null_double"), summary=summary) + + _, _, summary = describe_supported_spark(config=config, series=test_df.select("null_double"), summary=summary) + + _, _, summary = describe_numeric_1d_spark(config=config, df=test_df.select("null_double"), summary=summary) + + assert summary["iqr"] is np.nan + assert summary["mad"] is np.nan + assert summary["cv"] is np.nan + assert summary["mean"] is None + assert summary["histogram"] == [] + + +def test_describe_counts_spark(spark_session): + test_df = create_test_df(spark_session) + + _, _, summary = describe_counts_spark(config=config, series=test_df.select("category"), summary={}) + + assert summary["value_counts_without_nan"].loc["test_1"] == 206 + + _, _, summary = describe_counts_spark(config=config, series=test_df.select("double"), summary={}) + + assert summary["value_counts_without_nan"].loc[float(1)] == 206 + + _, _, summary = describe_counts_spark(config=config, series=test_df.select("int"), summary={}) + + assert summary["value_counts_without_nan"].loc[int(1)] == 206 + + _, _, summary = describe_counts_spark(config=config, series=test_df.select("boolean"), summary={}) + + assert summary["value_counts_without_nan"].loc[True] == 205 + + _, _, summary = describe_counts_spark(config=config, series=test_df.select("null_double"), summary={}) + + assert summary["value_counts_without_nan"].size == 0