Skip to content

Commit 47c88d4

Browse files
ISSUE #3031 - Dynamic Sampling Config (#27184)
* feat: move flat sampling to sampling config + dynamic sampling option * feat: move flat sampling on the backend to sample profile conifg object * feat: fix circular import * feat: align UI with new profiler config * feat: fix json schema * feat: align python imports with new schema path * feat: update migration to look at extension * feat: remove enable * feat: remove enable * feat: added titles to sample config * feat: generated ts classes * feat: addressed comments * feat: change sample config instantiation to match new structure * feat: removed backward compatible fields * feat: ran java linting * UI fixes, tests and locale changes * fix failing test * fix ui check style * fix failing profiler test * feat: fix ci failures * feat: generated ts classes * feat: fix ci failure * fix: failing ci * address comments * fix failing test * fix: ci failure --------- Co-authored-by: Harshit Shah <dinkushah169@gmail.com>
1 parent 5ffff63 commit 47c88d4

95 files changed

Lines changed: 2891 additions & 630 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

bootstrap/sql/migrations/native/1.13.0/mysql/schemaChanges.sql

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,86 @@ FROM user_entity ue, role_entity re
130130
WHERE ue.name = 'mcpapplicationbot'
131131
AND re.name = 'ApplicationBotImpersonationRole';
132132

133+
134+
UPDATE entity_extension
135+
SET json = JSON_SET(
136+
json,
137+
'$.profileSampleConfig',
138+
JSON_OBJECT(
139+
'sampleConfigType', 'STATIC',
140+
'config', JSON_OBJECT(
141+
'profileSample', JSON_EXTRACT(json, '$.profileSample'),
142+
'profileSampleType', COALESCE(
143+
JSON_EXTRACT(json, '$.profileSampleType'),
144+
CAST('"PERCENTAGE"' AS JSON)
145+
),
146+
'samplingMethodType', JSON_EXTRACT(json, '$.samplingMethodType')
147+
)
148+
)
149+
)
150+
WHERE extension IN (
151+
'table.tableProfilerConfig',
152+
'database.databaseProfilerConfig',
153+
'databaseSchema.databaseSchemaProfilerConfig'
154+
)
155+
AND JSON_EXTRACT(json, '$.profileSample') IS NOT NULL
156+
AND JSON_TYPE(JSON_EXTRACT(json, '$.profileSample')) != 'NULL'
157+
AND NOT JSON_CONTAINS_PATH(json, 'one', '$.profileSampleConfig');
158+
159+
-- entity_extension: remove old flat fields
160+
UPDATE entity_extension
161+
SET json = JSON_REMOVE(
162+
JSON_REMOVE(
163+
JSON_REMOVE(json, '$.samplingMethodType'),
164+
'$.profileSampleType'
165+
),
166+
'$.profileSample'
167+
)
168+
WHERE extension IN (
169+
'table.tableProfilerConfig',
170+
'database.databaseProfilerConfig',
171+
'databaseSchema.databaseSchemaProfilerConfig'
172+
)
173+
AND (JSON_CONTAINS_PATH(json, 'one', '$.profileSample')
174+
OR JSON_CONTAINS_PATH(json, 'one', '$.profileSampleType')
175+
OR JSON_CONTAINS_PATH(json, 'one', '$.samplingMethodType'));
176+
177+
-- ingestion_pipeline_entity (profiler pipelines): build profileSampleConfig (skip if already migrated)
178+
UPDATE ingestion_pipeline_entity
179+
SET json = JSON_SET(
180+
json,
181+
'$.sourceConfig.config.profileSampleConfig',
182+
JSON_OBJECT(
183+
'sampleConfigType', 'STATIC',
184+
'config', JSON_OBJECT(
185+
'profileSample', JSON_EXTRACT(json, '$.sourceConfig.config.profileSample'),
186+
'profileSampleType', COALESCE(
187+
JSON_EXTRACT(json, '$.sourceConfig.config.profileSampleType'),
188+
CAST('"PERCENTAGE"' AS JSON)
189+
),
190+
'samplingMethodType', JSON_EXTRACT(json, '$.sourceConfig.config.samplingMethodType')
191+
)
192+
)
193+
)
194+
WHERE pipelineType = 'profiler'
195+
AND JSON_EXTRACT(json, '$.sourceConfig.config.profileSample') IS NOT NULL
196+
AND JSON_TYPE(JSON_EXTRACT(json, '$.sourceConfig.config.profileSample')) != 'NULL'
197+
AND NOT JSON_CONTAINS_PATH(json, 'one', '$.sourceConfig.config.profileSampleConfig');
198+
199+
-- ingestion_pipeline_entity (profiler pipelines): remove old flat fields
200+
UPDATE ingestion_pipeline_entity
201+
SET json = JSON_REMOVE(
202+
JSON_REMOVE(
203+
JSON_REMOVE(json, '$.sourceConfig.config.samplingMethodType'),
204+
'$.sourceConfig.config.profileSampleType'
205+
),
206+
'$.sourceConfig.config.profileSample'
207+
)
208+
WHERE pipelineType = 'profiler'
209+
AND (JSON_CONTAINS_PATH(json, 'one', '$.sourceConfig.config.profileSample')
210+
OR JSON_CONTAINS_PATH(json, 'one', '$.sourceConfig.config.profileSampleType')
211+
OR JSON_CONTAINS_PATH(json, 'one', '$.sourceConfig.config.samplingMethodType'));
212+
133213
-- RDF distributed indexing state tables
134214
CREATE TABLE IF NOT EXISTS rdf_index_job (
135215
id VARCHAR(36) NOT NULL,

bootstrap/sql/migrations/native/1.13.0/postgres/schemaChanges.sql

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,83 @@ WHERE ue.name = 'mcpapplicationbot'
151151
AND re.name = 'ApplicationBotImpersonationRole'
152152
ON CONFLICT DO NOTHING;
153153

154+
-- Migrate profiler sampling config: move flat profileSample/profileSampleType/samplingMethodType
155+
-- into the new profileSampleConfig structure. Default to STATIC since DYNAMIC is new.
156+
157+
-- Profiler configs are stored in entity_extension table, not in entity json columns.
158+
-- Extension keys: table.tableProfilerConfig, database.databaseProfilerConfig, databaseSchema.databaseSchemaProfilerConfig
159+
-- The json column in entity_extension contains the config object directly (flat root-level fields).
160+
161+
-- entity_extension: build profileSampleConfig from existing flat fields (skip if already migrated)
162+
UPDATE entity_extension
163+
SET json = jsonb_set(
164+
json::jsonb,
165+
'{profileSampleConfig}',
166+
jsonb_build_object(
167+
'sampleConfigType', 'STATIC',
168+
'config', jsonb_build_object(
169+
'profileSample', json::jsonb #> '{profileSample}',
170+
'profileSampleType', COALESCE(
171+
json::jsonb #> '{profileSampleType}',
172+
'"PERCENTAGE"'::jsonb
173+
),
174+
'samplingMethodType', json::jsonb #> '{samplingMethodType}'
175+
)
176+
)
177+
)::json
178+
WHERE extension IN (
179+
'table.tableProfilerConfig',
180+
'database.databaseProfilerConfig',
181+
'databaseSchema.databaseSchemaProfilerConfig'
182+
)
183+
AND json::jsonb #>> '{profileSample}' IS NOT NULL
184+
AND json::jsonb #> '{profileSampleConfig}' IS NULL;
185+
186+
-- entity_extension: remove old flat fields
187+
UPDATE entity_extension
188+
SET json = (json::jsonb #- '{profileSample}'
189+
#- '{profileSampleType}'
190+
#- '{samplingMethodType}')::json
191+
WHERE extension IN (
192+
'table.tableProfilerConfig',
193+
'database.databaseProfilerConfig',
194+
'databaseSchema.databaseSchemaProfilerConfig'
195+
)
196+
AND (json::jsonb #>> '{profileSample}' IS NOT NULL
197+
OR json::jsonb #>> '{profileSampleType}' IS NOT NULL
198+
OR json::jsonb #>> '{samplingMethodType}' IS NOT NULL);
199+
200+
-- ingestion_pipeline_entity (profiler pipelines): build profileSampleConfig (skip if already migrated)
201+
UPDATE ingestion_pipeline_entity
202+
SET json = jsonb_set(
203+
json::jsonb,
204+
'{sourceConfig,config,profileSampleConfig}',
205+
jsonb_build_object(
206+
'sampleConfigType', 'STATIC',
207+
'config', jsonb_build_object(
208+
'profileSample', json::jsonb #> '{sourceConfig,config,profileSample}',
209+
'profileSampleType', COALESCE(
210+
json::jsonb #> '{sourceConfig,config,profileSampleType}',
211+
'"PERCENTAGE"'::jsonb
212+
),
213+
'samplingMethodType', json::jsonb #> '{sourceConfig,config,samplingMethodType}'
214+
)
215+
)
216+
)::json
217+
WHERE json #>> '{pipelineType}' = 'profiler'
218+
AND json::jsonb #>> '{sourceConfig,config,profileSample}' IS NOT NULL
219+
AND json::jsonb #> '{sourceConfig,config,profileSampleConfig}' IS NULL;
220+
221+
-- ingestion_pipeline_entity (profiler pipelines): remove old flat fields
222+
UPDATE ingestion_pipeline_entity
223+
SET json = (json::jsonb #- '{sourceConfig,config,profileSample}'
224+
#- '{sourceConfig,config,profileSampleType}'
225+
#- '{sourceConfig,config,samplingMethodType}')::json
226+
WHERE json #>> '{pipelineType}' = 'profiler'
227+
AND (json::jsonb #>> '{sourceConfig,config,profileSample}' IS NOT NULL
228+
OR json::jsonb #>> '{sourceConfig,config,profileSampleType}' IS NOT NULL
229+
OR json::jsonb #>> '{sourceConfig,config,samplingMethodType}' IS NOT NULL);
230+
154231
-- RDF distributed indexing state tables
155232
CREATE TABLE IF NOT EXISTS rdf_index_job (
156233
id VARCHAR(36) NOT NULL,

ingestion/src/metadata/data_quality/runner/base_test_suite_source.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,12 @@
3232
)
3333
from metadata.generated.schema.type.entityReference import EntityReference
3434
from metadata.ingestion.ometa.ometa_api import OpenMetadata
35-
from metadata.sampler.models import SampleConfig
35+
from metadata.sampler.models import (
36+
ProfileSampleConfig,
37+
ProfileSampleConfigType,
38+
SampleConfig,
39+
StaticSamplingConfig,
40+
)
3641
from metadata.sampler.sampler_interface import SamplerInterface
3742
from metadata.utils.bigquery_utils import copy_service_config
3843
from metadata.utils.profiler_utils import get_context_entities
@@ -126,9 +131,16 @@ def create_data_quality_interface(self) -> TestSuiteInterface:
126131
schema_entity=schema_entity,
127132
database_entity=database_entity,
128133
default_sample_config=SampleConfig(
129-
profileSample=self.source_config.profileSample,
130-
profileSampleType=self.source_config.profileSampleType,
131-
samplingMethodType=self.source_config.samplingMethodType,
134+
profileSampleConfig=ProfileSampleConfig(
135+
sampleConfigType=ProfileSampleConfigType.STATIC,
136+
config=StaticSamplingConfig(
137+
profileSample=self.source_config.profileSample,
138+
profileSampleType=self.source_config.profileSampleType,
139+
samplingMethodType=self.source_config.samplingMethodType,
140+
),
141+
)
142+
if self.source_config.profileSample
143+
else None,
132144
),
133145
)
134146

ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableDiff.py

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
TableDiffRuntimeParameters,
3939
TableParameter,
4040
)
41-
from metadata.generated.schema.entity.data.table import Column, ProfileSampleType
41+
from metadata.generated.schema.entity.data.table import Column
4242
from metadata.generated.schema.entity.services.connections.database.sapHanaConnection import (
4343
SapHanaScheme,
4444
)
@@ -50,6 +50,7 @@
5050
TestCaseStatus,
5151
TestResultValue,
5252
)
53+
from metadata.generated.schema.type.basic import ProfileSampleType
5354
from metadata.profiler.metrics.registry import Metrics
5455
from metadata.profiler.orm.converter.base import build_orm_col
5556
from metadata.profiler.orm.functions.md5 import MD5
@@ -465,16 +466,19 @@ def sample_where_clause(self) -> Tuple[Optional[str], Optional[str]]:
465466
on Table 1 and the hash will ensure that the same row is selected on Table 2. We want to avoid selecting rows
466467
with different ids because the comparison will not be sensible.
467468
"""
468-
if (
469-
# no sample configuration
470-
self.runtime_params.table_profile_config is None
471-
or self.runtime_params.table_profile_config.profileSample is None
472-
# sample is 100% or in other words no sample is required
473-
or (
474-
self.runtime_params.table_profile_config.profileSampleType
475-
== ProfileSampleType.PERCENTAGE
476-
and self.runtime_params.table_profile_config.profileSample == 100
477-
)
469+
config = self.runtime_params.table_profile_config
470+
if config is None:
471+
return None, None
472+
profile_sample_config = config.profileSampleConfig if config else None
473+
sample_config = profile_sample_config.root if profile_sample_config else None
474+
static = sample_config.config if sample_config else None
475+
profile_sample = getattr(static, "profileSample", None) if static else None
476+
profile_sample_type = (
477+
getattr(static, "profileSampleType", None) if static else None
478+
)
479+
if profile_sample is None or (
480+
profile_sample_type == ProfileSampleType.PERCENTAGE
481+
and profile_sample == 100
478482
):
479483
return None, None
480484
if DatabaseServiceType.Mssql in [
@@ -520,26 +524,19 @@ def maybe_case_sensitive(self, iterable: Iterable[str]) -> list[str]:
520524
def calculate_nounce(self, max_nounce=2**32 - 1) -> int:
521525
"""Calculate the nounce based on the profile sample configuration. The nounce is
522526
the sample fraction projected to a number on a scale of 0 to max_nounce"""
523-
if (
524-
self.runtime_params.table_profile_config.profileSampleType
525-
== ProfileSampleType.PERCENTAGE
526-
):
527-
return int(
528-
max_nounce
529-
* self.runtime_params.table_profile_config.profileSample
530-
/ 100
531-
)
532-
if (
533-
self.runtime_params.table_profile_config.profileSampleType
534-
== ProfileSampleType.ROWS
535-
):
527+
config = self.runtime_params.table_profile_config
528+
profile_sample_config = config.profileSampleConfig if config else None
529+
sample_config = profile_sample_config.root if profile_sample_config else None
530+
static = sample_config.config if sample_config else None
531+
profile_sample = getattr(static, "profileSample", 100)
532+
profile_sample_type = getattr(static, "profileSampleType", None)
533+
if profile_sample_type == ProfileSampleType.PERCENTAGE:
534+
return int(max_nounce * profile_sample / 100)
535+
if profile_sample_type == ProfileSampleType.ROWS:
536536
row_count = self.get_total_row_count()
537537
if row_count is None:
538538
raise ValueError("Row count is required for ROWS profile sample type")
539-
return int(
540-
max_nounce
541-
* (self.runtime_params.table_profile_config.profileSample / row_count)
542-
)
539+
return int(max_nounce * (profile_sample / row_count))
543540
raise ValueError("Invalid profile sample type")
544541

545542
def get_row_diff_test_case_result(

ingestion/src/metadata/mixins/pandas/pandas_mixin.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,22 +147,23 @@ def get_sampled_dataframe(
147147

148148
def yield_sampled_dfs():
149149
dfs = raw_dataset
150-
if sample_config.profileSampleType == ProfileSampleType.PERCENTAGE:
150+
static = sample_config.get_static_config()
151+
if static and static.profileSampleType == ProfileSampleType.PERCENTAGE:
151152
# Sampling based on percentage of rows will be applied to each dataframe chunk
152153
# to ensure consistent efficiency across large dataset. Other option would be to
153154
# either concatenate all dataframes (may cause OOM) or perform 2 passes (one to count rows,
154155
# another to sample) which would be less efficient.
155156
try:
156-
percentage = sample_config.profileSample or 100
157+
percentage = static.profileSample or 100
157158
for df in dfs():
158159
yield df.sample(frac=percentage / 100)
159160
except Exception as exc:
160161
logger.error(
161-
f"Error sampling dataframes based on percentage {sample_config.profileSample}: {exc}"
162+
f"Error sampling dataframes based on percentage {static.profileSample}: {exc}"
162163
)
163-
elif sample_config.profileSampleType == ProfileSampleType.ROWS:
164+
elif static and static.profileSampleType == ProfileSampleType.ROWS:
164165
try:
165-
rows = sample_config.profileSample or 0
166+
rows = static.profileSample or 0
166167
streamed_rows = 0
167168
for df in dfs():
168169
n = len(df)
@@ -174,7 +175,7 @@ def yield_sampled_dfs():
174175
break
175176
except Exception as exc:
176177
logger.error(
177-
f"Error sampling dataframes based on rows {sample_config.profileSample}: {exc}"
178+
f"Error sampling dataframes based on rows {static.profileSample}: {exc}"
178179
)
179180
else:
180181
logger.warning(

ingestion/src/metadata/profiler/processor/core.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -548,13 +548,15 @@ def get_profile(self) -> CreateTableProfileRequest:
548548
createDateTime=raw_create_date,
549549
sizeInByte=self._table_results.get("sizeInBytes"),
550550
profileSample=(
551-
self.profiler_interface.sampler.sample_config.profileSample
551+
self.profiler_interface.sampler.sample_config.get_static_config().profileSample
552552
if self.profiler_interface.sampler.sample_config
553+
and self.profiler_interface.sampler.sample_config.get_static_config()
553554
else None
554555
),
555556
profileSampleType=(
556-
self.profiler_interface.sampler.sample_config.profileSampleType
557+
self.profiler_interface.sampler.sample_config.get_static_config().profileSampleType
557558
if self.profiler_interface.sampler.sample_config
559+
and self.profiler_interface.sampler.sample_config.get_static_config()
558560
else None
559561
),
560562
customMetrics=self._table_results.get("customMetrics"),

ingestion/src/metadata/profiler/source/database/base/profiler_source.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
get_exclude_columns,
4545
get_include_columns,
4646
)
47-
from metadata.sampler.models import SampleConfig
47+
from metadata.sampler.models import ProfileSampleConfig, SampleConfig
4848
from metadata.sampler.sampler_interface import SamplerInterface
4949
from metadata.utils.dependency_injector.dependency_injector import (
5050
DependencyNotFoundError,
@@ -141,6 +141,19 @@ def _copy_service_config(
141141

142142
return config_copy
143143

144+
def _build_default_sample_config(self) -> SampleConfig:
145+
"""Build a SampleConfig from the pipeline's profileSampleConfig."""
146+
profile_sample_config = None
147+
raw = self.source_config.profileSampleConfig if self.source_config else None
148+
if raw:
149+
profile_sample_config = ProfileSampleConfig.model_validate(raw.model_dump())
150+
return SampleConfig(
151+
profileSampleConfig=profile_sample_config,
152+
randomizedSample=self.source_config.randomizedSample
153+
if self.source_config
154+
else False,
155+
)
156+
144157
@inject
145158
def create_profiler_interface(
146159
self,
@@ -177,12 +190,7 @@ def create_profiler_interface(
177190
schema_entity=schema_entity,
178191
database_entity=database_entity,
179192
table_config=config,
180-
default_sample_config=SampleConfig(
181-
profileSample=self.source_config.profileSample,
182-
profileSampleType=self.source_config.profileSampleType,
183-
samplingMethodType=self.source_config.samplingMethodType,
184-
randomizedSample=self.source_config.randomizedSample,
185-
),
193+
default_sample_config=self._build_default_sample_config(),
186194
# TODO: Change this when we have the processing engine configuration implemented. Right now it does nothing.
187195
processing_engine=self.get_processing_engine(self.source_config),
188196
)

0 commit comments

Comments
 (0)