Skip to content
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var/
*.egg-info/
.installed.cfg
*.egg
uv.lock

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down Expand Up @@ -85,3 +86,6 @@ docsrc/source/pages/api/_autosummary/
# User created
VERSION
version.py

# local java/spark sdk environment files
.sdkmanrc
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ dev = [
"sphinx-autodoc-typehints>=1.10.3",
"sphinx-multiversion>=0.2.3",
"autodoc_pydantic",
"standard-imghdr"
]

docs = [
Expand Down
20 changes: 18 additions & 2 deletions src/ydata_profiling/model/spark/correlations_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,23 @@ def _compute_corr_natively(df: DataFrame, summary: dict, corr_type: str) -> Arra
exact same workflow. The syntax is Correlation.corr(dataframe, method="pearson" OR "spearman"),
and Correlation is from pyspark.ml.stat
"""
variables = {column: description["type"] for column, description in summary.items()}
variables = {
column: {
"type": description["type"],
"all_missing": description.get("n", 0) - description.get("n_missing", 0)
== 0,
}
for column, description in summary.items()
}
interval_columns = [
column for column, type_name in variables.items() if type_name == "Numeric"
column
for column, mapping in variables.items()
if mapping["type"] == "Numeric" and not mapping["all_missing"]
]

if not interval_columns:
return [], interval_columns

df = df.select(*interval_columns)

# convert to vector column first
Expand All @@ -63,6 +76,9 @@ def _compute_corr_natively(df: DataFrame, summary: dict, corr_type: str) -> Arra
assembler = VectorAssembler(**assembler_args)
df_vector = assembler.transform(df).select(vector_col)

if df_vector.count() <= 1:
return [], interval_columns

# get correlation matrix
matrix = (
Correlation.corr(df_vector, vector_col, method=corr_type).head()[0].toArray()
Expand Down
32 changes: 25 additions & 7 deletions src/ydata_profiling/model/spark/describe_counts_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pandas as pd
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql import types as T

from ydata_profiling.config import Settings
from ydata_profiling.model.summary_algorithms import describe_counts
Expand All @@ -25,6 +26,11 @@ def describe_counts_spark(
Returns:
Updated settings, input series, and summary dictionary.
"""
# Cast Decimal Type s
if isinstance(series.schema.fields[0].dataType, T.DecimalType):
series = series.select(
F.col(series.columns[0]).cast(T.DoubleType()).alias(series.columns[0])
)

# Count occurrences of each value
value_counts = series.groupBy(series.columns[0]).count()
Expand All @@ -36,9 +42,23 @@ def describe_counts_spark(
value_counts_index_sorted = value_counts.orderBy(F.asc(series.columns[0]))

# Count missing values
n_missing = (
value_counts.filter(F.col(series.columns[0]).isNull()).select("count").first()
)
if series.dtypes[0][1] in ("int", "float", "bigint", "double"):
n_missing = (
# Need to add the isnan() check because Pandas isnull check will count NaN as null, but Spark does not
value_counts.filter(
F.col(series.columns[0]).isNull() | F.isnan(F.col(series.columns[0]))
)
.select("count")
.first()
)
else:
n_missing = (
# Need to add the isnan() check because Pandas isnull check will count NaN as null, but Spark does not
value_counts.filter(F.col(series.columns[0]).isNull())
.select("count")
.first()
)

n_missing = n_missing["count"] if n_missing else 0

# Convert top 200 values to Pandas for frequency table display
Expand All @@ -60,17 +80,15 @@ def describe_counts_spark(
value_counts.filter(F.col(column).isNotNull()) # Exclude NaNs
.filter(~F.isnan(F.col(column))) # Remove implicit NaNs (if numeric column)
.groupBy(column) # Group by unique values
.count() # Count occurrences
.agg(F.sum("count").alias("count")) # Sum of count
.orderBy(F.desc("count")) # Sort in descending order
.limit(200) # Limit for performance
)
else:
value_counts_no_nan = (
value_counts.filter(F.col(column).isNotNull()) # Exclude NULLs
.groupBy(column) # Group by unique timestamp values
.count() # Count occurrences
.agg(F.sum("count").alias("count")) # Sum of count
.orderBy(F.desc("count")) # Sort by most frequent timestamps
.limit(200) # Limit for performance
)

# Convert to Pandas Series, forcing proper structure
Expand Down
58 changes: 39 additions & 19 deletions src/ydata_profiling/model/spark/describe_numeric_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@
def numeric_stats_spark(df: DataFrame, summary: dict) -> dict:
column = df.columns[0]

# Removing null types from numeric summary stats to match Pandas defaults which skip na's (skipna=False)
finite_filter = (
F.col(column).isNotNull()
& ~F.isnan(F.col(column))
& ~F.col(column).isin([np.inf, -np.inf])
)
non_null_df = df.filter(finite_filter)

expr = [
F.mean(F.col(column)).alias("mean"),
F.stddev(F.col(column)).alias("std"),
Expand All @@ -21,7 +29,7 @@ def numeric_stats_spark(df: DataFrame, summary: dict) -> dict:
F.skewness(F.col(column)).alias("skewness"),
F.sum(F.col(column)).alias("sum"),
]
return df.agg(*expr).first().asDict()
return non_null_df.agg(*expr).first().asDict()


def describe_numeric_1d_spark(
Expand Down Expand Up @@ -81,30 +89,42 @@ def describe_numeric_1d_spark(
quantiles = config.vars.num.quantiles
quantile_threshold = 0.05

summary.update(
{
f"{percentile:.0%}": value
for percentile, value in zip(
quantiles,
df.stat.approxQuantile(
f"{df.columns[0]}",
if summary.get("n") == summary.get("n_missing"):
# This means the entire column is null/nan, so summary values need to be hard-coded:
summary.update({f"{percentile:.0%}": np.nan for percentile in quantiles})

summary["mad"] = np.nan
summary["iqr"] = np.nan

else:
summary.update(
{
f"{percentile:.0%}": value
for percentile, value in zip(
quantiles,
quantile_threshold,
),
)
}
)
df.stat.approxQuantile(
f"{df.columns[0]}",
quantiles,
quantile_threshold,
),
)
}
)

median = summary["50%"]
median = summary.get("50%")

summary["mad"] = df.select(
(F.abs(F.col(f"{df.columns[0]}").cast("int") - median)).alias("abs_dev")
).stat.approxQuantile("abs_dev", [0.5], quantile_threshold)[0]
summary["mad"] = df.select(
(F.abs(F.col(f"{df.columns[0]}").cast("int") - median)).alias("abs_dev")
).stat.approxQuantile("abs_dev", [0.5], quantile_threshold)[0]

summary["iqr"] = summary["75%"] - summary["25%"]

# FIXME: move to fmt
summary["p_negative"] = summary["n_negative"] / summary["n"]
summary["range"] = summary["max"] - summary["min"]
summary["iqr"] = summary["75%"] - summary["25%"]
if summary["min"] is None or summary["max"] is None:
summary["range"] = np.nan
else:
summary["range"] = summary["max"] - summary["min"]
summary["cv"] = summary["std"] / summary["mean"] if summary["mean"] else np.nan
summary["p_zeros"] = summary["n_zeros"] / summary["n"]
summary["p_infinite"] = summary["n_infinite"] / summary["n"]
Expand Down
10 changes: 5 additions & 5 deletions src/ydata_profiling/model/spark/describe_supported_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ def describe_supported_spark(
"""

# number of non-NaN observations in the Series
count = summary["count"]
n_distinct = summary["value_counts"].count()
n = summary["n"]
n_distinct = summary["value_counts"].count() # Null/nan's count in value_counts

summary["n_distinct"] = n_distinct
summary["p_distinct"] = n_distinct / count if count > 0 else 0
summary["p_distinct"] = n_distinct / n if n > 0 else 0

n_unique = summary["value_counts"].where("count == 1").count()
summary["is_unique"] = n_unique == count
summary["is_unique"] = n_unique == n
summary["n_unique"] = n_unique
summary["p_unique"] = n_unique / count if count > 0 else 0
summary["p_unique"] = n_unique / n if n > 0 else 0

return config, series, summary
3 changes: 3 additions & 0 deletions src/ydata_profiling/model/spark/summary_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def spark_describe_1d(

if str(series.schema[0].dataType).startswith("ArrayType"):
dtype = "ArrayType"
elif str(series.schema[0].dataType).startswith("Decimal"):
dtype = "decimal"
else:
dtype = series.schema[0].dataType.simpleString()

Expand All @@ -56,6 +58,7 @@ def spark_describe_1d(
"boolean": "Boolean",
"date": "DateTime",
"timestamp": "DateTime",
"decimal": "Numeric",
}[dtype]

return summarizer.summarize(config, series, dtype=vtype)
Expand Down
17 changes: 10 additions & 7 deletions src/ydata_profiling/report/formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,16 @@ def fmt_numeric(value: float, precision: int = 10) -> str:
Returns:
The numeric value with the given precision.
"""
fmtted = f"{{:.{precision}g}}".format(value)
for v in ["e+", "e-"]:
if v in fmtted:
sign = "-" if v in "e-" else ""
fmtted = fmtted.replace(v, " × 10<sup>") + "</sup>"
fmtted = fmtted.replace("<sup>0", "<sup>")
fmtted = fmtted.replace("<sup>", f"<sup>{sign}")
if value is None:
fmtted = "N/A"
else:
fmtted = f"{{:.{precision}g}}".format(value)
for v in ["e+", "e-"]:
if v in fmtted:
sign = "-" if v in "e-" else ""
fmtted = fmtted.replace(v, " × 10<sup>") + "</sup>"
fmtted = fmtted.replace("<sup>0", "<sup>")
fmtted = fmtted.replace("<sup>", f"<sup>{sign}")

return fmtted

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ydata_profiling.report.presentation.core.renderable import Renderable
from ydata_profiling.report.presentation.frequency_table_utils import freq_table
from ydata_profiling.report.structure.variables.render_common import render_common
from ydata_profiling.report.utils import image_or_empty
from ydata_profiling.visualisation.plot import cat_frequency_plot, histogram


Expand Down Expand Up @@ -95,7 +96,7 @@ def render_categorical_length(
else:
hist_data = histogram(config, *summary["histogram_length"])

length_histo = Image(
length_histo = image_or_empty(
hist_data,
image_format=config.plot.image_format,
alt="length histogram",
Expand Down
43 changes: 29 additions & 14 deletions src/ydata_profiling/report/structure/variables/render_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
from ydata_profiling.report.presentation.core import (
Container,
FrequencyTable,
Image,
Table,
VariableInfo,
)
from ydata_profiling.report.structure.variables.render_common import render_common
from ydata_profiling.report.utils import image_or_empty
from ydata_profiling.visualisation.plot import histogram, mini_histogram


Expand Down Expand Up @@ -94,26 +94,41 @@ def render_count(config: Settings, summary: dict) -> dict:
style=config.html.style,
)

mini_histo = Image(
mini_histogram(config, *summary["histogram"]),
image_format=image_format,
summary_histogram = summary.get("histogram", [])

mini_hist_data = None

if summary_histogram:
mini_hist_data = mini_histogram(config, *summary["histogram"])

mini_histo = image_or_empty(
mini_hist_data,
alt="Mini histogram",
image_format=image_format,
name="Mini Histogram",
)

template_variables["top"] = Container(
[info, table1, table2, mini_histo], sequence_type="grid"
)

seqs = [
Image(
histogram(config, *summary["histogram"]),
image_format=image_format,
alt="Histogram",
caption=f"<strong>Histogram with fixed size bins</strong> (bins={len(summary['histogram'][1]) - 1})",
name="Histogram",
anchor_id="histogram",
)
]
hist_data = None
hist_caption = None

if summary_histogram:
hist_data = histogram(config, *summary["histogram"])
hist_caption = f"<strong>Histogram with fixed size bins</strong> (bins={len(summary['histogram'][1]) - 1})"

hist = image_or_empty(
hist_data,
alt="Histogram",
image_format=image_format,
caption=hist_caption,
name="Histogram",
anchor_id="histogram",
)

seqs = [hist]

fq = FrequencyTable(
template_variables["freq_table_rows"],
Expand Down
Loading