Skip to content

Commit 8982585

Browse files
rederik76rederik76haillew
authored
Fix: allow operational metadata to be disabled on MV's and update docs (#21)
* updated docs, add configflags to materilized view spec * updated docs, add configflags to materilized view spec * fix operational metadata disable handling for MV's * fix periodic snapshot samples to exclude op metadata * Clarify documentation on disabling operational metadata at dataflow spec and target table levels --------- Co-authored-by: rederik76 <rederik76@gmail.com> Co-authored-by: Haille W <haille.woldegebriel@databricks.com>
1 parent f675c19 commit 8982585

12 files changed

Lines changed: 151 additions & 23 deletions

File tree

docs/source/feature_operational_metadata.rst

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,4 +190,121 @@ The below example illustrates the default configuration for a generic bronze and
190190
}
191191
}
192192
]
193-
}
193+
}
194+
195+
Disabling Operational Metadata in a Dataflow Spec
196+
-------------------------------------------------
197+
You can disable operational metadata at a dataflow spec level or at a target table level.
198+
199+
Disabling at Dataflow Spec Level
200+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
201+
202+
Use the ``features`` object to disable operational metadata at a dataflow spec level.
203+
204+
.. tabs::
205+
206+
.. tab:: JSON
207+
208+
.. code-block:: json
209+
:emphasize-lines: 5,6,7
210+
211+
{
212+
"dataFlowId": "feature_materialized_views",
213+
"dataFlowGroup": "feature_samples",
214+
"dataFlowType": "materialized_view",
215+
"features": {
216+
"operationalMetadataEnabled": false
217+
}
218+
}
219+
220+
.. tab:: YAML
221+
222+
.. code-block:: yaml
223+
:emphasize-lines: 4,5
224+
225+
dataFlowId: feature_materialized_views
226+
dataFlowGroup: feature_samples
227+
dataFlowType: materialized_view
228+
features:
229+
operationalMetadataEnabled: false
230+
231+
Disabling at Target Table Level
232+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
233+
234+
Use the ``configFlags`` array to disable operational metadata at a target table level.
235+
236+
.. tabs::
237+
238+
.. tab:: JSON
239+
240+
.. code-block:: json
241+
:emphasize-lines: 24
242+
243+
{
244+
"dataFlowId": "crm_1",
245+
"dataFlowGroup": "crm",
246+
"dataFlowType": "standard",
247+
"sourceType": "delta",
248+
"sourceSystem": "crm",
249+
"sourceViewName": "v_customer_address",
250+
"sourceDetails": {
251+
"database": "source_db",
252+
"table": "customer_address",
253+
"cdfEnabled": true,
254+
"schemaPath": "schemas/customer_address.json"
255+
},
256+
"mode": "stream",
257+
"targetFormat": "delta",
258+
"targetDetails": {
259+
"table": "customer_address",
260+
"tableProperties": {
261+
"delta.autoOptimize.optimizeWrite": "true",
262+
"delta.autoOptimize.autoCompact": "true"
263+
},
264+
"partitionColumns": ["country_code"],
265+
"schemaPath": "schemas/customer_address.json",
266+
"configFlags": ["disableOperationalMetadata"]
267+
},
268+
"dataQualityExpectationsEnabled": true,
269+
"quarantineMode": "table",
270+
"quarantineTargetDetails": {
271+
"targetFormat": "delta",
272+
"table": "customer_address_quarantine",
273+
"tableProperties": {}
274+
}
275+
}
276+
277+
.. tab:: YAML
278+
279+
.. code-block:: yaml
280+
:emphasize-lines: 22,23
281+
282+
dataFlowId: crm_1
283+
dataFlowGroup: crm
284+
dataFlowType: standard
285+
sourceType: delta
286+
sourceSystem: crm
287+
sourceViewName: v_customer_address
288+
sourceDetails:
289+
database: source_db
290+
table: customer_address
291+
cdfEnabled: true
292+
schemaPath: schemas/customer_address.json
293+
mode: stream
294+
targetFormat: delta
295+
targetDetails:
296+
table: customer_address
297+
tableProperties:
298+
delta.autoOptimize.optimizeWrite: 'true'
299+
delta.autoOptimize.autoCompact: 'true'
300+
partitionColumns:
301+
- country_code
302+
schemaPath: schemas/customer_address.json
303+
configFlags:
304+
- disableOperationalMetadata
305+
dataQualityExpectationsEnabled: true
306+
quarantineMode: table
307+
quarantineTargetDetails:
308+
targetFormat: delta
309+
table: customer_address_quarantine
310+
tableProperties: {}

samples/bronze_sample/src/dataflows/feature_samples/dataflowspec/periodic_snapshot_scd1_main.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
"tableProperties": {
2525
"delta.enableChangeDataFeed": "true"
2626
},
27-
"schemaPath": "target/feature_periodic_snapshot_customer_schema.json"
27+
"schemaPath": "target/feature_periodic_snapshot_customer_schema.json",
28+
"configFlags": ["disableOperationalMetadata"]
2829
},
2930
"cdcSnapshotSettings": {
3031
"snapshotType": "periodic",

samples/bronze_sample/src/dataflows/feature_samples/dataflowspec/periodic_snapshot_scd2_main.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
"tableProperties": {
2525
"delta.enableChangeDataFeed": "true"
2626
},
27-
"schemaPath": "target/feature_periodic_snapshot_customer_schema.json"
27+
"schemaPath": "target/feature_periodic_snapshot_customer_schema.json",
28+
"configFlags": ["disableOperationalMetadata"]
2829
},
2930
"cdcSnapshotSettings": {
3031
"snapshotType": "periodic",

src/dataflow/dataflow.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,6 @@ def _init_target_details(self):
110110
self.logger.info(f"Initializing DataFlow for target schema: {self.target_database}, {log_target}")
111111
self.logger.debug(f"Target Details: {self.target_details.__dict__}")
112112

113-
# Add operational metadata columns to the schema
114-
if not hasattr(self.target_details, 'schema') or not self.target_details.schema:
115-
return
116-
117-
if not self.features.operationalMetadataEnabled:
118-
return
119-
120113
def _init_cdc_settings(self):
121114
"""init CDC settings."""
122115

@@ -247,7 +240,7 @@ def create_dataflow(self):
247240
self._create_flow_groups()
248241

249242
# create materialized view
250-
self.target_details.create_table(expectations)
243+
self.target_details.create_table(expectations, features=self.features)
251244

252245
elif self.dataflow_spec.targetFormat in SinkType.__dict__.values():
253246

src/dataflow/targets/base.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import utility
1111

1212
from ..enums import TableType, TargetConfigFlags
13+
from ..features import Features
1314

1415
Self = TypeVar("Self", bound="BaseTargetDelta")
1516

@@ -250,7 +251,8 @@ def add_table_properties(self, table_properties: Dict) -> Self:
250251

251252
def create_table(
252253
self,
253-
expectations: Dict = None
254+
expectations: Dict = None,
255+
features: Features = None
254256
) -> None:
255257
"""
256258
Create the target table for the data flow.
@@ -277,13 +279,14 @@ def create_table(
277279
logger.debug(f"Expectations: {self.table}, {expectations}")
278280
logger.debug(f"Config Flags: {self.configFlags}")
279281

280-
self._create_table(schema, expectations)
282+
self._create_table(schema, expectations, features)
281283

282284
@abstractmethod
283285
def _create_table(
284286
self,
285287
schema: T.StructType | str,
286-
expectations: Dict = None
288+
expectations: Dict = None,
289+
features: Features = None
287290
) -> None:
288291
"""Abstract implementation for target specific table creation logic."""
289292
pass

src/dataflow/targets/delta_materialized_view.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pyspark import pipelines as dp
55
from pyspark.sql import types as T
66

7+
from ..features import Features
78
from ..operational_metadata import OperationalMetadataMixin
89
from ..sql import SqlMixin
910

@@ -50,7 +51,8 @@ class TargetDeltaMaterializedView(BaseTargetDelta, SqlMixin, OperationalMetadata
5051
def _create_table(
5152
self,
5253
schema: T.StructType | str,
53-
expectations: Dict = None
54+
expectations: Dict = None,
55+
features: Features = None
5456
) -> None:
5557
"""Create the target table for the data flow."""
5658
spark = self.spark
@@ -98,7 +100,8 @@ def mv_query():
98100
df = spark.sql(sql)
99101

100102
# Add operational metadata if needed
101-
if operational_metadata_schema:
103+
operational_metadata_enabled = features.operationalMetadataEnabled if features else True
104+
if operational_metadata_schema and operational_metadata_enabled:
102105
df = self._add_operational_metadata(
103106
spark,
104107
df,

src/dataflow/targets/delta_streaming_table.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from pyspark.sql import types as T
66

77
from .base import BaseTargetDelta
8+
from ..features import Features
89

910

1011
@dataclass(kw_only=True)
@@ -40,7 +41,8 @@ class TargetDeltaStreamingTable(BaseTargetDelta):
4041
def _create_table(
4142
self,
4243
schema: T.StructType | str,
43-
expectations: Dict = None
44+
expectations: Dict = None,
45+
features: Features = None
4446
) -> None:
4547
"""Create the target table for the data flow."""
4648
dp.create_streaming_table(

src/dataflow_spec_builder/dataflow_spec_builder.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,6 @@ def _process_spec_data(self, base_path: str, spec_data: Dict) -> None:
514514

515515
# Substitute secrets in the dataflow spec with SecretValue objects
516516
spec_data = self.secrets_manager.substitute_secrets(spec_data)
517-
518517
self.logger.info(f"Adding Dataflow Spec: {spec_data.get(self.Keys.DATA_FLOW_ID)}.")
519518
self.processed_specs.append(DataflowSpec(**spec_data))
520519

src/dataflow_spec_builder/transformer/base.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,19 @@ def _process_spec(self, spec_data: Dict) -> Union[Dict, List[Dict]]:
1818

1919
def transform(self, spec_data: Dict) -> Union[Dict, List[Dict]]:
2020
"""Transform the spec data. Returns either a single Dict or List[Dict]."""
21+
spec_data = self._apply_features_and_limitations(spec_data)
2122
return self._process_spec(spec_data)
2223

2324
def _apply_features_and_limitations(self, dataflow_spec: Dict) -> Dict:
2425
"""Apply common features and limitations transformations."""
2526
# Operational MetadataSnapshot
2627
features = dataflow_spec.get("features", {})
27-
2828
if not features:
2929
dataflow_spec["features"] = {}
3030

3131
# FEATURE: Operational Metadata
32-
operational_metadata_enabled = features.get("operationalMetadataEnabled", None)
33-
if not operational_metadata_enabled:
34-
dataflow_spec["features"]["operationalMetadataEnabled"] = True
32+
operational_metadata_enabled = features.get("operationalMetadataEnabled", True)
33+
dataflow_spec["features"]["operationalMetadataEnabled"] = operational_metadata_enabled
3534

3635
# LIMITATIONS: CDC SNAPSHOT
3736
if dataflow_spec.get("cdcSnapshotSettings"):

src/dataflow_spec_builder/transformer/materialized_views.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def _build_base_flow_spec(self, spec_data: Dict, mv_config: Dict, target_details
6060
"dataFlowId": spec_data.get("dataFlowId"),
6161
"dataFlowGroup": spec_data.get("dataFlowGroup"),
6262
"dataFlowType": spec_data.get("dataFlowType"),
63+
"features": spec_data.get("features", {}),
6364
"targetFormat": TargetType.DELTA,
6465
"targetDetails": target_details,
6566
"quarantineMode": mv_config.get("quarantineMode"),

0 commit comments

Comments
 (0)