Skip to content

Commit 5e14164

Browse files
RajdeepKushwaha5TeddyCrCopilot
authored
fix(sampler): Respect randomizedSample flag at 100% percentage sampling (#26966)
* fix(sampler): respect randomizedSample flag at 100% percentage sampling When profileSample is 100% with PERCENTAGE type, the sampler short-circuits and returns the raw dataset without any randomization, even when randomizedSample is True (the default). Split the combined condition so: - No profileSample set -> return raw dataset (no sampling configured) - 100% PERCENTAGE + randomizedSample=False -> return raw dataset (optimization) - 100% PERCENTAGE + randomizedSample=True -> go through normal sampling path which applies RandomNumFn/df.sample for proper row shuffling Fixes #21304 * Address review: use 'is False' for Optional[bool] and add unit tests - Fix randomizedSample check from 'not' to 'is False' in both SQASampler and DatalakeSampler to correctly handle None (Optional[bool] default=True) - Add unit tests verifying 100%% PERCENTAGE behavior for randomizedSample values True, False, and None * Add ORDER BY on random column in fetch_sample_data for true randomization The get_dataset() fix ensures 100% PERCENTAGE + randomizedSample routes through get_sample_query() which produces a CTE with a random column. Now fetch_sample_data() detects that column and applies ORDER BY before LIMIT, so each call returns a different subset of rows. Also add real-DB integration tests using SQLite for the 100% PERCENTAGE edge case (True, False, None). * Address review: remove stale comment, unused import, add return assertions * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Address review: move ORDER BY to get_sample_query, clean up fetch_sample_data - Move ORDER BY rnd.c.random into get_sample_query() PERCENTAGE branch, gated on randomizedSample is not False (mirrors ABSOLUTE branch pattern) - Revert fetch_sample_data() to original: remove ds_columns variable, random_column detection, and ORDER BY logic (ordering now handled in CTE) - Remove duplicate assertions in DatalakeSampler100Pct tests * Address review: None defaults to False for randomizedSample Per TeddyCr's feedback, randomization is computationally heavy and should not be the default. Changed from 'is False'/'is not False' to truthiness checks so None (unset) behaves the same as False. Only explicit randomizedSample=True triggers ORDER BY and skips the 100% fast path. This is consistent with the ABSOLUTE branch which already uses truthiness checks. * Fix integration test: None should skip sample_query (matches truthiness semantics) * fix(tests): update BigQuery view sampling expected queries with ORDER BY BigQuery views fall through to SQASampler.get_sample_query() which now adds ORDER BY rnd.random when randomizedSample is enabled. Update the expected SQL strings in test_sampling_for_views and test_sampling_view_with_partition to match. * refactor: use explicit is False for randomizedSample checks Address review comments: SampleConfig.randomizedSample defaults to True, so only an explicit False should disable randomization. Using is False / is not False instead of truthiness ensures None follows the model default (enabled) rather than being incorrectly treated as disabled. * ci: re-trigger checks after SIGSEGV flake * refactor: only explicit True randomizes, add non-determinism tests * test: increase non-determinism iterations to reduce flakiness * chore: added randomize as false * fix: align randomizedSample defaults with schema (false) * fix: remove ORDER BY from BigQuery test expectations BigQuery sampling tests create SampleConfig without setting randomizedSample, which now defaults to False. Since ORDER BY is only added when randomizedSample is True, the expected query strings should not include ORDER BY. Also fix inaccurate docstring in test_sample.py. * test: increase non-determinism test iterations to reduce flakiness Increase fetch_sample_data loop from 10 to 20 iterations to further reduce the theoretical probability of a false failure in the randomized ordering test. --------- Co-authored-by: Teddy <teddy.crepineau@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 8018f9a commit 5e14164

11 files changed

Lines changed: 298 additions & 15 deletions

File tree

bootstrap/sql/migrations/native/1.13.0/mysql/postDataMigrationSQLScript.sql

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,24 @@ SET json = JSON_REMOVE(json, '$.sourceConfig.config.computeMetrics')
33
WHERE JSON_EXTRACT(json, '$.sourceConfig.config.computeMetrics') IS NOT NULL
44
AND pipelineType = 'profiler';
55

6+
-- Set randomizedSample to false where it was true (old default behavior)
7+
UPDATE ingestion_pipeline_entity
8+
SET json = JSON_SET(json, '$.sourceConfig.config.randomizedSample', false)
9+
WHERE JSON_EXTRACT(json, '$.sourceConfig.config.randomizedSample') = true
10+
AND pipelineType = 'profiler';
11+
12+
UPDATE table_entity
13+
SET json = JSON_SET(json, '$.tableProfilerConfig.randomizedSample', false)
14+
WHERE JSON_EXTRACT(json, '$.tableProfilerConfig.randomizedSample') = true;
15+
16+
UPDATE database_entity
17+
SET json = JSON_SET(json, '$.databaseProfilerConfig.randomizedSample', false)
18+
WHERE JSON_EXTRACT(json, '$.databaseProfilerConfig.randomizedSample') = true;
19+
20+
UPDATE database_schema_entity
21+
SET json = JSON_SET(json, '$.databaseSchemaProfilerConfig.randomizedSample', false)
22+
WHERE JSON_EXTRACT(json, '$.databaseSchemaProfilerConfig.randomizedSample') = true;
23+
624
-- Hard-delete ingestion pipelines for Iceberg services (must run before service migration)
725
DELETE ipe FROM ingestion_pipeline_entity ipe
826
JOIN dbservice_entity dse

bootstrap/sql/migrations/native/1.13.0/postgres/postDataMigrationSQLScript.sql

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,24 @@ SET json = (json::jsonb #- '{sourceConfig,config,computeMetrics}')::json
33
WHERE json::jsonb -> 'sourceConfig' -> 'config' -> 'computeMetrics' IS NOT NULL
44
AND pipelineType = 'profiler';
55

6+
-- Set randomizedSample to false where it was true (old default behavior)
7+
UPDATE ingestion_pipeline_entity
8+
SET json = jsonb_set(json::jsonb, '{sourceConfig,config,randomizedSample}', 'false'::jsonb)::json
9+
WHERE json::jsonb #>> '{sourceConfig,config,randomizedSample}' = 'true'
10+
AND pipelineType = 'profiler';
11+
12+
UPDATE table_entity
13+
SET json = jsonb_set(json::jsonb, '{tableProfilerConfig,randomizedSample}', 'false'::jsonb)::json
14+
WHERE json::jsonb #>> '{tableProfilerConfig,randomizedSample}' = 'true';
15+
16+
UPDATE database_entity
17+
SET json = jsonb_set(json::jsonb, '{databaseProfilerConfig,randomizedSample}', 'false'::jsonb)::json
18+
WHERE json::jsonb #>> '{databaseProfilerConfig,randomizedSample}' = 'true';
19+
20+
UPDATE database_schema_entity
21+
SET json = jsonb_set(json::jsonb, '{databaseSchemaProfilerConfig,randomizedSample}', 'false'::jsonb)::json
22+
WHERE json::jsonb #>> '{databaseSchemaProfilerConfig,randomizedSample}' = 'true';
23+
624
-- Hard-delete ingestion pipelines for Iceberg services (must run before service migration)
725
DELETE FROM ingestion_pipeline_entity ipe
826
USING dbservice_entity dse

ingestion/src/metadata/sampler/models.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"""
1212
Sampling Models
1313
"""
14+
1415
from typing import Any, List, Optional, Union
1516

1617
from pydantic import Field, model_validator
@@ -42,7 +43,7 @@ class BaseProfileConfig(ConfigModel):
4243
profileSampleType: Optional[ProfileSampleType] = None
4344
samplingMethodType: Optional[SamplingMethodType] = None
4445
sampleDataCount: Optional[int] = 100
45-
randomizedSample: Optional[bool] = True
46+
randomizedSample: Optional[bool] = False
4647

4748

4849
class ColumnConfig(ConfigModel):
@@ -58,7 +59,7 @@ class TableConfig(BaseProfileConfig):
5859
profileQuery: Optional[str] = None
5960
partitionConfig: Optional[PartitionProfilerConfig] = None
6061
columnConfig: Optional[ColumnConfig] = None
61-
randomizedSample: Optional[bool] = True
62+
randomizedSample: Optional[bool] = False
6263

6364
@classmethod
6465
def from_database_and_schema_config(
@@ -127,4 +128,4 @@ class SampleConfig(ConfigModel):
127128
profileSample: Optional[Union[float, int]] = None
128129
profileSampleType: Optional[ProfileSampleType] = ProfileSampleType.PERCENTAGE
129130
samplingMethodType: Optional[SamplingMethodType] = None
130-
randomizedSample: Optional[bool] = True
131+
randomizedSample: Optional[bool] = False

ingestion/src/metadata/sampler/pandas/sampler.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,13 @@ def get_dataset(self, **__):
107107
if self.partition_details:
108108
raw_dataset = self._partitioned_table()
109109

110-
if not self.sample_config.profileSample or (
110+
if not self.sample_config.profileSample:
111+
return raw_dataset
112+
113+
if (
111114
self.sample_config.profileSample == 100
112115
and self.sample_config.profileSampleType == ProfileSampleType.PERCENTAGE
116+
and self.sample_config.randomizedSample is not True
113117
):
114118
return raw_dataset
115119
return self.get_sampled_dataframe(raw_dataset, self.sample_config)

ingestion/src/metadata/sampler/sqlalchemy/sampler.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,22 +164,25 @@ def get_sample_query(self, *, column=None) -> Query:
164164
(ModuloFn(RandomNumFn(), 100)).label(RANDOM_LABEL),
165165
).cte(f"{self.get_sampler_table_name()}_rnd")
166166
session_query = client.query(rnd)
167-
return session_query.where(
167+
query = session_query.where(
168168
rnd.c.random <= self.sample_config.profileSample
169-
).cte(f"{self.get_sampler_table_name()}_sample")
169+
)
170+
if self.sample_config.randomizedSample is True:
171+
query = query.order_by(rnd.c.random)
172+
return query.cte(f"{self.get_sampler_table_name()}_sample")
170173

171174
table_query = client.query(self.raw_dataset)
172175
if self.partition_details:
173176
table_query = self.get_partitioned_query(table_query)
174177
session_query = self._base_sample_query(
175178
column,
176179
(ModuloFn(RandomNumFn(), table_query.count())).label(RANDOM_LABEL)
177-
if self.sample_config.randomizedSample
180+
if self.sample_config.randomizedSample is True
178181
else None,
179182
)
180183
query = (
181184
session_query.order_by(RANDOM_LABEL)
182-
if self.sample_config.randomizedSample
185+
if self.sample_config.randomizedSample is True
183186
else session_query
184187
)
185188
return query.limit(self.sample_config.profileSample).cte(
@@ -194,9 +197,16 @@ def get_dataset(self, column=None, **__) -> Union[type, AliasedClass]:
194197
if self.sample_query:
195198
return self._rdn_sample_from_user_query()
196199

197-
if not self.sample_config.profileSample or (
200+
if not self.sample_config.profileSample:
201+
if self.partition_details:
202+
return self._partitioned_table()
203+
204+
return self.raw_dataset
205+
206+
if (
198207
self.sample_config.profileSampleType == ProfileSampleType.PERCENTAGE
199208
and self.sample_config.profileSample == 100
209+
and self.sample_config.randomizedSample is not True
200210
):
201211
if self.partition_details:
202212
return self._partitioned_table()
@@ -217,7 +227,6 @@ def fetch_sample_data(self, columns: Optional[List[Column]] = None) -> TableData
217227
if self.sample_query:
218228
return self._fetch_sample_data_from_user_query()
219229

220-
# Add new RandomNumFn column
221230
ds = self.get_dataset()
222231
if not columns:
223232
sqa_columns = [col for col in inspect(ds).c if col.name != RANDOM_LABEL]

ingestion/tests/unit/observability/profiler/sqlalchemy/test_sample.py

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@
2121
from sqlalchemy.orm import DeclarativeBase
2222

2323
from metadata.generated.schema.entity.data.table import Column as EntityColumn
24-
from metadata.generated.schema.entity.data.table import ColumnName, DataType, Table
24+
from metadata.generated.schema.entity.data.table import (
25+
ColumnName,
26+
DataType,
27+
ProfileSampleType,
28+
Table,
29+
)
2530
from metadata.generated.schema.entity.services.connections.database.sqliteConnection import (
2631
SQLiteConnection,
2732
SQLiteScheme,
@@ -361,6 +366,114 @@ def test_sample_from_user_query(self, sampler_mock):
361366
names = [col.root for col in sample_data.columns]
362367
assert names == ["id", "name"]
363368

369+
def test_full_percentage_randomized_uses_sample_query(self, sampler_mock):
370+
"""100% PERCENTAGE + randomizedSample=True should go through
371+
get_sample_query which adds ORDER BY on the random column."""
372+
with patch.object(SQASampler, "build_table_orm", return_value=User):
373+
sampler = SQASampler(
374+
service_connection_config=self.sqlite_conn,
375+
ometa_client=None,
376+
entity=None,
377+
sample_config=SampleConfig(
378+
profileSampleType=ProfileSampleType.PERCENTAGE,
379+
profileSample=100,
380+
randomizedSample=True,
381+
),
382+
sample_data_count=5,
383+
)
384+
385+
with patch.object(
386+
sampler, "get_sample_query", wraps=sampler.get_sample_query
387+
) as mock_gsq:
388+
sampler.fetch_sample_data()
389+
assert mock_gsq.called
390+
391+
def test_full_percentage_not_randomized_skips_sample_query(self, sampler_mock):
392+
"""100% PERCENTAGE + randomizedSample=False should short-circuit
393+
to raw dataset and NOT call get_sample_query."""
394+
with patch.object(SQASampler, "build_table_orm", return_value=User):
395+
sampler = SQASampler(
396+
service_connection_config=self.sqlite_conn,
397+
ometa_client=None,
398+
entity=None,
399+
sample_config=SampleConfig(
400+
profileSampleType=ProfileSampleType.PERCENTAGE,
401+
profileSample=100,
402+
randomizedSample=False,
403+
),
404+
sample_data_count=5,
405+
)
406+
407+
with patch.object(
408+
sampler, "get_sample_query", wraps=sampler.get_sample_query
409+
) as mock_gsq:
410+
sampler.fetch_sample_data()
411+
assert not mock_gsq.called
412+
413+
def test_full_percentage_none_randomized_skips_sample_query(self, sampler_mock):
414+
"""100% PERCENTAGE + randomizedSample=None should short-circuit
415+
(only explicit True enables randomization)."""
416+
with patch.object(SQASampler, "build_table_orm", return_value=User):
417+
sampler = SQASampler(
418+
service_connection_config=self.sqlite_conn,
419+
ometa_client=None,
420+
entity=None,
421+
sample_config=SampleConfig(
422+
profileSampleType=ProfileSampleType.PERCENTAGE,
423+
profileSample=100,
424+
randomizedSample=None,
425+
),
426+
sample_data_count=5,
427+
)
428+
429+
with patch.object(
430+
sampler, "get_sample_query", wraps=sampler.get_sample_query
431+
) as mock_gsq:
432+
sampler.fetch_sample_data()
433+
assert not mock_gsq.called
434+
435+
def test_randomized_true_produces_non_deterministic_rows(self, sampler_mock):
436+
"""With randomizedSample=True at 100% PERCENTAGE, multiple
437+
fetch_sample_data calls should return rows in different orders."""
438+
with patch.object(SQASampler, "build_table_orm", return_value=User):
439+
sampler = SQASampler(
440+
service_connection_config=self.sqlite_conn,
441+
ometa_client=None,
442+
entity=None,
443+
sample_config=SampleConfig(
444+
profileSampleType=ProfileSampleType.PERCENTAGE,
445+
profileSample=100,
446+
randomizedSample=True,
447+
),
448+
sample_data_count=5,
449+
)
450+
451+
results = [sampler.fetch_sample_data().rows for _ in range(20)]
452+
assert any(
453+
results[i] != results[0] for i in range(1, len(results))
454+
), "Expected non-deterministic row ordering with randomizedSample=True"
455+
456+
def test_randomized_false_produces_deterministic_rows(self, sampler_mock):
457+
"""With randomizedSample=False at 100% PERCENTAGE, multiple
458+
fetch_sample_data calls should return rows in the same order."""
459+
with patch.object(SQASampler, "build_table_orm", return_value=User):
460+
sampler = SQASampler(
461+
service_connection_config=self.sqlite_conn,
462+
ometa_client=None,
463+
entity=None,
464+
sample_config=SampleConfig(
465+
profileSampleType=ProfileSampleType.PERCENTAGE,
466+
profileSample=100,
467+
randomizedSample=False,
468+
),
469+
sample_data_count=5,
470+
)
471+
472+
results = [sampler.fetch_sample_data().rows for _ in range(5)]
473+
assert all(
474+
results[i] == results[0] for i in range(1, len(results))
475+
), "Expected deterministic row ordering with randomizedSample=False"
476+
364477
@classmethod
365478
def tearDownClass(cls) -> None:
366479
os.remove(cls.db_path)

0 commit comments

Comments
 (0)