Skip to content

Commit 45d76ea

Browse files
authored
fix: Multiple Spark Enhancements (#1800)
* fix: update spark numeric stats calculations In the Pandas implementation, the numeric stats like min/max/stddev/etc. by default ignore null values. This commit updates the spark implementation to more closely match that. * fix: update spark null checks for describe_counts_spark method Need to add the isnan() check because Pandas isnull check will count NaN as null, but Spark does not. * fix: update Spark frequency counts The previous calculation of counts was actually counting an already summarized dataframe, so it wasn't capturing the correct counts for each instance of a value. This is updated by summing the count value instead of performing a row count. * fix: adding tests for issue 1429 * fix: edge case - completely null numeric field in Spark Discovered this edge case with real data, and still need to fix the rendering of an empty histogram. * fix: add handling for spark DecimalType This change addresses issue #1602. Computations in the summarize process result in some floats when computing against decimal columns.To solution this, we simply convert those types to a DoubleType when performing those numeric operations. * fix: add handling for spark correlations with no numeric fields Assembling a vector column in Spark with no numeric columns results in features with a NULL size, NULL indices, and an empty list of values. This causes an exception to be raised when computing correlations. The solution here is to avoid computing the correlation matrix when there are no interval columns (numeric). This change addresses issue #1722. * fix: allow NoneType values to be string formatted This change addresses issue #1723. It implements a "N/A" string as the default when formatting NoneType values. * fix: multiple Spark fixes to approach closer parity to Pandas profiles Addresses handling of completely null numeric columns, and gracefully handling empty correlation sets and plots.
1 parent 0450215 commit 45d76ea

23 files changed

Lines changed: 658 additions & 187 deletions

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ var/
2424
*.egg-info/
2525
.installed.cfg
2626
*.egg
27+
uv.lock
2728

2829
# PyInstaller
2930
# Usually these files are written by a python script from a template
@@ -85,3 +86,6 @@ docsrc/source/pages/api/_autosummary/
8586
# User created
8687
VERSION
8788
version.py
89+
90+
# local java/spark sdk environment files
91+
.sdkmanrc

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ dev = [
8787
"sphinx-autodoc-typehints>=1.10.3",
8888
"sphinx-multiversion>=0.2.3",
8989
"autodoc_pydantic",
90+
"standard-imghdr"
9091
]
9192

9293
docs = [

src/ydata_profiling/model/spark/correlations_spark.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,23 @@ def _compute_corr_natively(df: DataFrame, summary: dict, corr_type: str) -> Arra
4545
exact same workflow. The syntax is Correlation.corr(dataframe, method="pearson" OR "spearman"),
4646
and Correlation is from pyspark.ml.stat
4747
"""
48-
variables = {column: description["type"] for column, description in summary.items()}
48+
variables = {
49+
column: {
50+
"type": description["type"],
51+
"all_missing": description.get("n", 0) - description.get("n_missing", 0)
52+
== 0,
53+
}
54+
for column, description in summary.items()
55+
}
4956
interval_columns = [
50-
column for column, type_name in variables.items() if type_name == "Numeric"
57+
column
58+
for column, mapping in variables.items()
59+
if mapping["type"] == "Numeric" and not mapping["all_missing"]
5160
]
61+
62+
if not interval_columns:
63+
return [], interval_columns
64+
5265
df = df.select(*interval_columns)
5366

5467
# convert to vector column first
@@ -63,6 +76,9 @@ def _compute_corr_natively(df: DataFrame, summary: dict, corr_type: str) -> Arra
6376
assembler = VectorAssembler(**assembler_args)
6477
df_vector = assembler.transform(df).select(vector_col)
6578

79+
if df_vector.count() <= 1:
80+
return [], interval_columns
81+
6682
# get correlation matrix
6783
matrix = (
6884
Correlation.corr(df_vector, vector_col, method=corr_type).head()[0].toArray()

src/ydata_profiling/model/spark/describe_counts_spark.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import pandas as pd
77
from pyspark.sql import DataFrame
88
from pyspark.sql import functions as F
9+
from pyspark.sql import types as T
910

1011
from ydata_profiling.config import Settings
1112
from ydata_profiling.model.summary_algorithms import describe_counts
@@ -25,6 +26,11 @@ def describe_counts_spark(
2526
Returns:
2627
Updated settings, input series, and summary dictionary.
2728
"""
29+
# Cast Decimal Type s
30+
if isinstance(series.schema.fields[0].dataType, T.DecimalType):
31+
series = series.select(
32+
F.col(series.columns[0]).cast(T.DoubleType()).alias(series.columns[0])
33+
)
2834

2935
# Count occurrences of each value
3036
value_counts = series.groupBy(series.columns[0]).count()
@@ -36,9 +42,23 @@ def describe_counts_spark(
3642
value_counts_index_sorted = value_counts.orderBy(F.asc(series.columns[0]))
3743

3844
# Count missing values
39-
n_missing = (
40-
value_counts.filter(F.col(series.columns[0]).isNull()).select("count").first()
41-
)
45+
if series.dtypes[0][1] in ("int", "float", "bigint", "double"):
46+
n_missing = (
47+
# Need to add the isnan() check because Pandas isnull check will count NaN as null, but Spark does not
48+
value_counts.filter(
49+
F.col(series.columns[0]).isNull() | F.isnan(F.col(series.columns[0]))
50+
)
51+
.select("count")
52+
.first()
53+
)
54+
else:
55+
n_missing = (
56+
# Need to add the isnan() check because Pandas isnull check will count NaN as null, but Spark does not
57+
value_counts.filter(F.col(series.columns[0]).isNull())
58+
.select("count")
59+
.first()
60+
)
61+
4262
n_missing = n_missing["count"] if n_missing else 0
4363

4464
# Convert top 200 values to Pandas for frequency table display
@@ -60,17 +80,15 @@ def describe_counts_spark(
6080
value_counts.filter(F.col(column).isNotNull()) # Exclude NaNs
6181
.filter(~F.isnan(F.col(column))) # Remove implicit NaNs (if numeric column)
6282
.groupBy(column) # Group by unique values
63-
.count() # Count occurrences
83+
.agg(F.sum("count").alias("count")) # Sum of count
6484
.orderBy(F.desc("count")) # Sort in descending order
65-
.limit(200) # Limit for performance
6685
)
6786
else:
6887
value_counts_no_nan = (
6988
value_counts.filter(F.col(column).isNotNull()) # Exclude NULLs
7089
.groupBy(column) # Group by unique timestamp values
71-
.count() # Count occurrences
90+
.agg(F.sum("count").alias("count")) # Sum of count
7291
.orderBy(F.desc("count")) # Sort by most frequent timestamps
73-
.limit(200) # Limit for performance
7492
)
7593

7694
# Convert to Pandas Series, forcing proper structure

src/ydata_profiling/model/spark/describe_numeric_spark.py

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,14 @@
1111
def numeric_stats_spark(df: DataFrame, summary: dict) -> dict:
1212
column = df.columns[0]
1313

14+
# Removing null types from numeric summary stats to match Pandas defaults which skip na's (skipna=False)
15+
finite_filter = (
16+
F.col(column).isNotNull()
17+
& ~F.isnan(F.col(column))
18+
& ~F.col(column).isin([np.inf, -np.inf])
19+
)
20+
non_null_df = df.filter(finite_filter)
21+
1422
expr = [
1523
F.mean(F.col(column)).alias("mean"),
1624
F.stddev(F.col(column)).alias("std"),
@@ -21,7 +29,7 @@ def numeric_stats_spark(df: DataFrame, summary: dict) -> dict:
2129
F.skewness(F.col(column)).alias("skewness"),
2230
F.sum(F.col(column)).alias("sum"),
2331
]
24-
return df.agg(*expr).first().asDict()
32+
return non_null_df.agg(*expr).first().asDict()
2533

2634

2735
def describe_numeric_1d_spark(
@@ -81,30 +89,42 @@ def describe_numeric_1d_spark(
8189
quantiles = config.vars.num.quantiles
8290
quantile_threshold = 0.05
8391

84-
summary.update(
85-
{
86-
f"{percentile:.0%}": value
87-
for percentile, value in zip(
88-
quantiles,
89-
df.stat.approxQuantile(
90-
f"{df.columns[0]}",
92+
if summary.get("n") == summary.get("n_missing"):
93+
# This means the entire column is null/nan, so summary values need to be hard-coded:
94+
summary.update({f"{percentile:.0%}": np.nan for percentile in quantiles})
95+
96+
summary["mad"] = np.nan
97+
summary["iqr"] = np.nan
98+
99+
else:
100+
summary.update(
101+
{
102+
f"{percentile:.0%}": value
103+
for percentile, value in zip(
91104
quantiles,
92-
quantile_threshold,
93-
),
94-
)
95-
}
96-
)
105+
df.stat.approxQuantile(
106+
f"{df.columns[0]}",
107+
quantiles,
108+
quantile_threshold,
109+
),
110+
)
111+
}
112+
)
97113

98-
median = summary["50%"]
114+
median = summary.get("50%")
99115

100-
summary["mad"] = df.select(
101-
(F.abs(F.col(f"{df.columns[0]}").cast("int") - median)).alias("abs_dev")
102-
).stat.approxQuantile("abs_dev", [0.5], quantile_threshold)[0]
116+
summary["mad"] = df.select(
117+
(F.abs(F.col(f"{df.columns[0]}").cast("int") - median)).alias("abs_dev")
118+
).stat.approxQuantile("abs_dev", [0.5], quantile_threshold)[0]
119+
120+
summary["iqr"] = summary["75%"] - summary["25%"]
103121

104122
# FIXME: move to fmt
105123
summary["p_negative"] = summary["n_negative"] / summary["n"]
106-
summary["range"] = summary["max"] - summary["min"]
107-
summary["iqr"] = summary["75%"] - summary["25%"]
124+
if summary["min"] is None or summary["max"] is None:
125+
summary["range"] = np.nan
126+
else:
127+
summary["range"] = summary["max"] - summary["min"]
108128
summary["cv"] = summary["std"] / summary["mean"] if summary["mean"] else np.nan
109129
summary["p_zeros"] = summary["n_zeros"] / summary["n"]
110130
summary["p_infinite"] = summary["n_infinite"] / summary["n"]

src/ydata_profiling/model/spark/describe_supported_spark.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ def describe_supported_spark(
1919
"""
2020

2121
# number of non-NaN observations in the Series
22-
count = summary["count"]
23-
n_distinct = summary["value_counts"].count()
22+
n = summary["n"]
23+
n_distinct = summary["value_counts"].count() # Null/nan's count in value_counts
2424

2525
summary["n_distinct"] = n_distinct
26-
summary["p_distinct"] = n_distinct / count if count > 0 else 0
26+
summary["p_distinct"] = n_distinct / n if n > 0 else 0
2727

2828
n_unique = summary["value_counts"].where("count == 1").count()
29-
summary["is_unique"] = n_unique == count
29+
summary["is_unique"] = n_unique == n
3030
summary["n_unique"] = n_unique
31-
summary["p_unique"] = n_unique / count if count > 0 else 0
31+
summary["p_unique"] = n_unique / n if n > 0 else 0
3232

3333
return config, series, summary

src/ydata_profiling/model/spark/summary_spark.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ def spark_describe_1d(
4343

4444
if str(series.schema[0].dataType).startswith("ArrayType"):
4545
dtype = "ArrayType"
46+
elif str(series.schema[0].dataType).startswith("Decimal"):
47+
dtype = "decimal"
4648
else:
4749
dtype = series.schema[0].dataType.simpleString()
4850

@@ -56,6 +58,7 @@ def spark_describe_1d(
5658
"boolean": "Boolean",
5759
"date": "DateTime",
5860
"timestamp": "DateTime",
61+
"decimal": "Numeric",
5962
}[dtype]
6063

6164
return summarizer.summarize(config, series, dtype=vtype)

src/ydata_profiling/report/formatters.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,16 @@ def fmt_numeric(value: float, precision: int = 10) -> str:
245245
Returns:
246246
The numeric value with the given precision.
247247
"""
248-
fmtted = f"{{:.{precision}g}}".format(value)
249-
for v in ["e+", "e-"]:
250-
if v in fmtted:
251-
sign = "-" if v in "e-" else ""
252-
fmtted = fmtted.replace(v, " × 10<sup>") + "</sup>"
253-
fmtted = fmtted.replace("<sup>0", "<sup>")
254-
fmtted = fmtted.replace("<sup>", f"<sup>{sign}")
248+
if value is None:
249+
fmtted = "N/A"
250+
else:
251+
fmtted = f"{{:.{precision}g}}".format(value)
252+
for v in ["e+", "e-"]:
253+
if v in fmtted:
254+
sign = "-" if v in "e-" else ""
255+
fmtted = fmtted.replace(v, " × 10<sup>") + "</sup>"
256+
fmtted = fmtted.replace("<sup>0", "<sup>")
257+
fmtted = fmtted.replace("<sup>", f"<sup>{sign}")
255258

256259
return fmtted
257260

src/ydata_profiling/report/structure/variables/render_categorical.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from ydata_profiling.report.presentation.core.renderable import Renderable
2424
from ydata_profiling.report.presentation.frequency_table_utils import freq_table
2525
from ydata_profiling.report.structure.variables.render_common import render_common
26+
from ydata_profiling.report.utils import image_or_empty
2627
from ydata_profiling.visualisation.plot import cat_frequency_plot, histogram
2728

2829

@@ -95,7 +96,7 @@ def render_categorical_length(
9596
else:
9697
hist_data = histogram(config, *summary["histogram_length"])
9798

98-
length_histo = Image(
99+
length_histo = image_or_empty(
99100
hist_data,
100101
image_format=config.plot.image_format,
101102
alt="length histogram",

src/ydata_profiling/report/structure/variables/render_count.py

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
from ydata_profiling.report.presentation.core import (
99
Container,
1010
FrequencyTable,
11-
Image,
1211
Table,
1312
VariableInfo,
1413
)
1514
from ydata_profiling.report.structure.variables.render_common import render_common
15+
from ydata_profiling.report.utils import image_or_empty
1616
from ydata_profiling.visualisation.plot import histogram, mini_histogram
1717

1818

@@ -94,26 +94,41 @@ def render_count(config: Settings, summary: dict) -> dict:
9494
style=config.html.style,
9595
)
9696

97-
mini_histo = Image(
98-
mini_histogram(config, *summary["histogram"]),
99-
image_format=image_format,
97+
summary_histogram = summary.get("histogram", [])
98+
99+
mini_hist_data = None
100+
101+
if summary_histogram:
102+
mini_hist_data = mini_histogram(config, *summary["histogram"])
103+
104+
mini_histo = image_or_empty(
105+
mini_hist_data,
100106
alt="Mini histogram",
107+
image_format=image_format,
108+
name="Mini Histogram",
101109
)
102110

103111
template_variables["top"] = Container(
104112
[info, table1, table2, mini_histo], sequence_type="grid"
105113
)
106114

107-
seqs = [
108-
Image(
109-
histogram(config, *summary["histogram"]),
110-
image_format=image_format,
111-
alt="Histogram",
112-
caption=f"<strong>Histogram with fixed size bins</strong> (bins={len(summary['histogram'][1]) - 1})",
113-
name="Histogram",
114-
anchor_id="histogram",
115-
)
116-
]
115+
hist_data = None
116+
hist_caption = None
117+
118+
if summary_histogram:
119+
hist_data = histogram(config, *summary["histogram"])
120+
hist_caption = f"<strong>Histogram with fixed size bins</strong> (bins={len(summary['histogram'][1]) - 1})"
121+
122+
hist = image_or_empty(
123+
hist_data,
124+
alt="Histogram",
125+
image_format=image_format,
126+
caption=hist_caption,
127+
name="Histogram",
128+
anchor_id="histogram",
129+
)
130+
131+
seqs = [hist]
117132

118133
fq = FrequencyTable(
119134
template_variables["freq_table_rows"],

0 commit comments

Comments
 (0)