Skip to content

Commit 0379cd1

Browse files
TeddyCrOpenMetadata Release Bot
authored andcommitted
fix: unaligned metric signature (#27370)
* fix: unaligned metric signature * fix: added dimension imp. for mariadb and singlestore + integration tests * fix: ci failure * fix: ci failure * fix: ci failure * fix: ci failures (cherry picked from commit 5204148)
1 parent 0d23ef8 commit 0379cd1

12 files changed

Lines changed: 548 additions & 11 deletions

File tree

ingestion/src/metadata/profiler/source/database/mariadb/functions/median.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@ class MariaDBMedianFn(FunctionElement):
1212

1313
@compiles(MariaDBMedianFn)
1414
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
15-
col = compiler.process(elements.clauses.clauses[0])
16-
percentile = elements.clauses.clauses[2].value
15+
clauses = elements.clauses.clauses
16+
col = compiler.process(clauses[0])
17+
percentile = clauses[2].value
18+
dimension_col = clauses[3].value if len(clauses) > 3 else None
19+
over = f"OVER(PARTITION BY {dimension_col})" if dimension_col else "OVER()"
1720
# According to the documentation available at https://mariadb.com/kb/en/median/#description,
1821
# the PERCENTILE_CONT function can be utilized to calculate the median. Therefore, it is
1922
# being used in this context.
20-
return f"PERCENTILE_CONT({percentile:.2f}) WITHIN GROUP (ORDER BY {col}) OVER()"
23+
return f"PERCENTILE_CONT({percentile:.2f}) WITHIN GROUP (ORDER BY {col}) {over}"

ingestion/src/metadata/profiler/source/database/mariadb/metrics/window/first_quartile.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66

77
class MariaDBFirstQuartile(FirstQuartile):
8-
def _compute_sqa_fn(self, column, table, percentile):
8+
def _compute_sqa_fn(self, column, table, percentile, dimension_col=None):
99
"""Generic method to compute the quartile using sqlalchemy"""
10+
if dimension_col is not None:
11+
return MariaDBMedianFn(column, table, percentile, dimension_col)
1012
return MariaDBMedianFn(column, table, percentile)

ingestion/src/metadata/profiler/source/database/mariadb/metrics/window/median.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66

77
class MariaDBMedian(Median):
8-
def _compute_sqa_fn(self, column, table, percentile):
8+
def _compute_sqa_fn(self, column, table, percentile, dimension_col=None):
99
"""Generic method to compute the quartile using sqlalchemy"""
10+
if dimension_col is not None:
11+
return MariaDBMedianFn(column, table, percentile, dimension_col)
1012
return MariaDBMedianFn(column, table, percentile)

ingestion/src/metadata/profiler/source/database/mariadb/metrics/window/third_quartile.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66

77
class MariaDBThirdQuartile(ThirdQuartile):
8-
def _compute_sqa_fn(self, column, table, percentile):
8+
def _compute_sqa_fn(self, column, table, percentile, dimension_col=None):
99
"""Generic method to compute the quartile using sqlalchemy"""
10+
if dimension_col is not None:
11+
return MariaDBMedianFn(column, table, percentile, dimension_col)
1012
return MariaDBMedianFn(column, table, percentile)

ingestion/src/metadata/profiler/source/database/single_store/functions/median.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,15 @@ class SingleStoreMedianFn(FunctionElement):
1212

1313
@compiles(SingleStoreMedianFn)
1414
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
15-
col = compiler.process(elements.clauses.clauses[0])
16-
percentile = elements.clauses.clauses[2].value
15+
clauses = elements.clauses.clauses
16+
col = compiler.process(clauses[0])
17+
table = clauses[1].value
18+
percentile = clauses[2].value
19+
dimension_col = clauses[3].value if len(clauses) > 3 else None
20+
if dimension_col:
21+
return (
22+
f"(SELECT approx_percentile({col}, {percentile:.2f}) "
23+
f"FROM {table} AS median_inner "
24+
f"WHERE median_inner.{dimension_col} = {table}.{dimension_col})"
25+
)
1726
return f"approx_percentile({col}, {percentile:.2f})"

ingestion/src/metadata/profiler/source/database/single_store/metrics/window/first_quartile.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88

99
class SingleStoreFirstQuartile(FirstQuartile):
10-
def _compute_sqa_fn(self, column, table, percentile):
10+
def _compute_sqa_fn(self, column, table, percentile, dimension_col=None):
1111
"""Generic method to compute the quartile using sqlalchemy"""
12+
if dimension_col is not None:
13+
return SingleStoreMedianFn(column, table, percentile, dimension_col)
1214
return SingleStoreMedianFn(column, table, percentile)

ingestion/src/metadata/profiler/source/database/single_store/metrics/window/median.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88

99
class SingleStoreMedian(Median):
10-
def _compute_sqa_fn(self, column, table, percentile):
10+
def _compute_sqa_fn(self, column, table, percentile, dimension_col=None):
1111
"""Generic method to compute the quartile using sqlalchemy"""
12+
if dimension_col is not None:
13+
return SingleStoreMedianFn(column, table, percentile, dimension_col)
1214
return SingleStoreMedianFn(column, table, percentile)

ingestion/src/metadata/profiler/source/database/single_store/metrics/window/third_quartile.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88

99
class SingleStoreThirdQuartile(ThirdQuartile):
10-
def _compute_sqa_fn(self, column, table, percentile):
10+
def _compute_sqa_fn(self, column, table, percentile, dimension_col=None):
1111
"""Generic method to compute the quartile using sqlalchemy"""
12+
if dimension_col is not None:
13+
return SingleStoreMedianFn(column, table, percentile, dimension_col)
1214
return SingleStoreMedianFn(column, table, percentile)
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
12+
"""
13+
Integration tests for MariaDB median/percentile functions against a real MariaDB container.
14+
15+
Validates that MariaDBMedianFn produces correct SQL and returns accurate results
16+
for both non-correlated (whole-table) and correlated (dimension_col) modes.
17+
"""
18+
19+
import pytest
20+
from sqlalchemy import Column, Float, Integer, String, column, create_engine, text
21+
from sqlalchemy.orm import DeclarativeBase, Session
22+
23+
from metadata.profiler.source.database.mariadb.functions.median import MariaDBMedianFn
24+
25+
try:
26+
from testcontainers.mysql import MySqlContainer
27+
except ImportError:
28+
pytest.skip("testcontainers not installed", allow_module_level=True)
29+
30+
31+
class Base(DeclarativeBase):
32+
pass
33+
34+
35+
class MedianTestData(Base):
36+
__tablename__ = "test_data"
37+
id = Column(Integer, primary_key=True)
38+
value = Column(Float, nullable=False)
39+
category = Column(String(50), nullable=False)
40+
41+
42+
# Test data: 10 rows, 2 categories
43+
# category "a": values [10, 20, 30, 40, 50] -> median=30, Q1=20, Q3=40
44+
# category "b": values [100, 200, 300, 400, 500] -> median=300, Q1=200, Q3=400
45+
TEST_ROWS = [
46+
(1, 10.0, "a"),
47+
(2, 20.0, "a"),
48+
(3, 30.0, "a"),
49+
(4, 40.0, "a"),
50+
(5, 50.0, "a"),
51+
(6, 100.0, "b"),
52+
(7, 200.0, "b"),
53+
(8, 300.0, "b"),
54+
(9, 400.0, "b"),
55+
(10, 500.0, "b"),
56+
]
57+
58+
59+
def _compile_median_fn(session, col_name, table_name, percentile, dimension_col=None):
60+
"""Compile a MariaDBMedianFn to SQL string using the session's dialect."""
61+
args = (column(col_name), table_name, percentile)
62+
if dimension_col is not None:
63+
args = args + (dimension_col,)
64+
fn = MariaDBMedianFn(*args)
65+
return fn.compile(
66+
dialect=session.get_bind().dialect,
67+
compile_kwargs={"literal_binds": True},
68+
)
69+
70+
71+
@pytest.fixture(scope="module")
72+
def mariadb_engine():
73+
container = MySqlContainer(image="mariadb:11", dbname="test_db")
74+
with container as container:
75+
url = container.get_connection_url()
76+
if url.startswith("mysql://"):
77+
url = "mysql+pymysql://" + url[len("mysql://") :]
78+
engine = create_engine(url)
79+
with engine.connect() as conn:
80+
conn.execute(
81+
text(
82+
"CREATE TABLE test_data ("
83+
"id INTEGER PRIMARY KEY, "
84+
"value DOUBLE NOT NULL, "
85+
"category VARCHAR(50) NOT NULL)"
86+
)
87+
)
88+
values = ", ".join(f"({row[0]}, {row[1]}, '{row[2]}')" for row in TEST_ROWS)
89+
conn.execute(
90+
text(f"INSERT INTO test_data (id, value, category) VALUES {values}")
91+
)
92+
conn.commit()
93+
yield engine
94+
engine.dispose()
95+
96+
97+
@pytest.fixture(scope="module")
98+
def session(mariadb_engine):
99+
with Session(mariadb_engine) as session:
100+
yield session
101+
102+
103+
class TestMariaDBMedianFn:
104+
def test_median_non_correlated(self, session):
105+
"""PERCENTILE_CONT(0.50) OVER() returns correct median for entire table"""
106+
compiled = _compile_median_fn(session, "value", "test_data", 0.50)
107+
result = session.execute(
108+
text(f"SELECT {compiled} AS median_val FROM test_data LIMIT 1")
109+
).scalar()
110+
assert result is not None
111+
assert result == pytest.approx(75.0, abs=1.0)
112+
113+
def test_first_quartile_non_correlated(self, session):
114+
"""PERCENTILE_CONT(0.25) OVER() returns correct Q1 for entire table"""
115+
compiled = _compile_median_fn(session, "value", "test_data", 0.25)
116+
result = session.execute(
117+
text(f"SELECT {compiled} AS q1_val FROM test_data LIMIT 1")
118+
).scalar()
119+
assert result is not None
120+
assert result == pytest.approx(32.5, abs=1.0)
121+
122+
def test_third_quartile_non_correlated(self, session):
123+
"""PERCENTILE_CONT(0.75) OVER() returns correct Q3 for entire table"""
124+
compiled = _compile_median_fn(session, "value", "test_data", 0.75)
125+
result = session.execute(
126+
text(f"SELECT {compiled} AS q3_val FROM test_data LIMIT 1")
127+
).scalar()
128+
assert result is not None
129+
assert result == pytest.approx(275.0, abs=1.0)
130+
131+
def test_median_with_dimension_col(self, session):
132+
"""MariaDBMedianFn with dimension_col generates PARTITION BY and returns per-group median"""
133+
compiled = _compile_median_fn(session, "value", "test_data", 0.50, "category")
134+
results = session.execute(
135+
text(
136+
f"SELECT DISTINCT category, {compiled} AS median_val "
137+
"FROM test_data ORDER BY category"
138+
)
139+
).fetchall()
140+
medians = {row[0]: row[1] for row in results}
141+
assert medians["a"] == pytest.approx(30.0, abs=1.0)
142+
assert medians["b"] == pytest.approx(300.0, abs=1.0)
143+
144+
def test_first_quartile_with_dimension_col(self, session):
145+
"""MariaDBMedianFn Q1 with dimension_col returns per-group first quartile"""
146+
compiled = _compile_median_fn(session, "value", "test_data", 0.25, "category")
147+
results = session.execute(
148+
text(
149+
f"SELECT DISTINCT category, {compiled} AS q1_val "
150+
"FROM test_data ORDER BY category"
151+
)
152+
).fetchall()
153+
medians = {row[0]: row[1] for row in results}
154+
assert medians["a"] == pytest.approx(20.0, abs=1.0)
155+
assert medians["b"] == pytest.approx(200.0, abs=1.0)
156+
157+
def test_third_quartile_with_dimension_col(self, session):
158+
"""MariaDBMedianFn Q3 with dimension_col returns per-group third quartile"""
159+
compiled = _compile_median_fn(session, "value", "test_data", 0.75, "category")
160+
results = session.execute(
161+
text(
162+
f"SELECT DISTINCT category, {compiled} AS q3_val "
163+
"FROM test_data ORDER BY category"
164+
)
165+
).fetchall()
166+
medians = {row[0]: row[1] for row in results}
167+
assert medians["a"] == pytest.approx(40.0, abs=1.0)
168+
assert medians["b"] == pytest.approx(400.0, abs=1.0)
169+
170+
def test_compiled_sql_contains_partition_by(self, session):
171+
"""Verify the compiled SQL includes PARTITION BY when dimension_col is set"""
172+
compiled = str(
173+
_compile_median_fn(session, "value", "test_data", 0.50, "category")
174+
)
175+
assert "PARTITION BY category" in compiled
176+
177+
def test_compiled_sql_no_partition_without_dimension(self, session):
178+
"""Verify the compiled SQL uses plain OVER() without dimension_col"""
179+
compiled = str(_compile_median_fn(session, "value", "test_data", 0.50))
180+
assert "OVER()" in compiled
181+
assert "PARTITION BY" not in compiled

0 commit comments

Comments
 (0)