Skip to content

Commit 7cfce88

Browse files
committed
feat: add refreshPolicy support for Materialized Views (#25)
Adds support for the `refresh_policy` parameter on Materialized Views (beta, requires DBR 17.3+). When `refreshPolicy` is set in the spec, the framework uses `@dp.materialized_view()` instead of `@dp.table()`. Closes #24
1 parent b5b888e commit 7cfce88

6 files changed

Lines changed: 68 additions & 4 deletions

File tree

docs/source/dataflow_spec_ref_main_materialized_views.rst

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ The following schema details the configuration for a Materialized View Data Flow
3939
"dataQualityExpectationsEnabled": false,
4040
"dataQualityExpectationsPath": "",
4141
"quarantineMode": "off",
42-
"quarantineTargetDetails": {}
42+
"quarantineTargetDetails": {},
43+
"refreshPolicy": "auto"
4344
}
4445
}
4546
}
@@ -70,6 +71,7 @@ The following schema details the configuration for a Materialized View Data Flow
7071
dataQualityExpectationsPath: ''
7172
quarantineMode: 'off'
7273
quarantineTargetDetails: {}
74+
refreshPolicy: auto
7375
7476
Example:
7577
--------
@@ -121,6 +123,13 @@ The below demonstrates a Materialized View Data Flow Spec:
121123
"quarantineTargetDetails": {
122124
"targetFormat": "delta"
123125
}
126+
},
127+
"mv_with_refresh_policy": {
128+
"sqlStatement": "SELECT * FROM {staging_schema}.customer",
129+
"refreshPolicy": "incremental_strict",
130+
"tableDetails": {
131+
"configFlags": ["disableOperationalMetadata"]
132+
}
124133
}
125134
}
126135
}
@@ -161,6 +170,12 @@ The below demonstrates a Materialized View Data Flow Spec:
161170
quarantineMode: table
162171
quarantineTargetDetails:
163172
targetFormat: delta
173+
mv_with_refresh_policy:
174+
sqlStatement: SELECT * FROM {staging_schema}.customer
175+
refreshPolicy: incremental_strict
176+
tableDetails:
177+
configFlags:
178+
- disableOperationalMetadata
164179
165180
The above dataflow spec sample contains the following core components:
166181

@@ -271,6 +286,13 @@ These properties define the materialized view specific configuration:
271286
* - **private** (*optional*)
272287
- ``boolean``
273288
- Create a table, but do not publish the table to the metastore.
289+
* - **refreshPolicy** (*optional*)
290+
- ``string``
291+
- *(Beta — requires DBR 17.3+)* The refresh policy for the materialized view.
292+
One of: ``auto``, ``incremental``, ``incremental_strict``, ``full``.
293+
When set, the framework uses ``@dp.materialized_view()`` with the ``refresh_policy`` parameter.
294+
Note: ``incremental_strict`` disallows non-deterministic functions such as ``current_timestamp()``.
295+
When using operational metadata with ``incremental_strict``, add ``disableOperationalMetadata`` to ``configFlags``.
274296

275297
.. _dataflow-spec-materialized-view-data-quality-configuration:
276298

docs/source/feature_materialized_views.rst

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Materialized Views
1212
- - `Materialized Views <https://docs.databricks.com/aws/en/dlt/materialized-views>`_
1313
- `Delta Live Tables Python Reference <https://docs.databricks.com/en/delta-live-tables/python-ref.html>`_
1414
- `Delta Live Tables SQL Reference <https://docs.databricks.com/en/delta-live-tables/sql-ref.html>`_
15+
- `Incremental Refresh for Materialized Views <https://docs.databricks.com/aws/en/optimizations/incremental-refresh>`_
1516

1617
Overview
1718
--------
@@ -83,7 +84,8 @@ The following schema details the configuration for a Materialized View Data Flow
8384
"dataQualityExpectationsEnabled": false,
8485
"dataQualityExpectationsPath": "",
8586
"quarantineMode": "off",
86-
"quarantineTargetDetails": {}
87+
"quarantineTargetDetails": {},
88+
"refreshPolicy": "auto"
8789
}
8890
}
8991
}
@@ -114,6 +116,7 @@ The following schema details the configuration for a Materialized View Data Flow
114116
dataQualityExpectationsPath: ''
115117
quarantineMode: 'off'
116118
quarantineTargetDetails: {}
119+
refreshPolicy: auto
117120
118121
Source Type Details
119122
~~~~~~~~~~~~~~~~~~~
@@ -217,6 +220,11 @@ Materialized Views support several additional configuration options:
217220
- Set quarantine mode
218221
- Configure quarantine target details
219222

223+
- **Refresh Policy** *(Beta — requires DBR 17.3+)*
224+
- Control how the MV is refreshed: ``auto``, ``incremental``, ``incremental_strict``, or ``full``
225+
- When set, the framework uses ``@dp.materialized_view()`` with the ``refresh_policy`` parameter
226+
- ``incremental_strict`` disallows non-deterministic functions (e.g. ``current_timestamp()``); use ``disableOperationalMetadata`` in ``configFlags`` when needed
227+
220228
Example Configuration
221229
-------------------
222230

@@ -258,6 +266,13 @@ A complete example of a materialized view configuration:
258266
"quarantineTargetDetails": {
259267
"targetFormat": "delta"
260268
}
269+
},
270+
"mv_with_refresh_policy": {
271+
"sqlStatement": "SELECT * FROM {staging_schema}.customer",
272+
"refreshPolicy": "incremental_strict",
273+
"tableDetails": {
274+
"configFlags": ["disableOperationalMetadata"]
275+
}
261276
}
262277
}
263278
}
@@ -289,5 +304,11 @@ A complete example of a materialized view configuration:
289304
quarantineMode: table
290305
quarantineTargetDetails:
291306
targetFormat: delta
307+
mv_with_refresh_policy:
308+
sqlStatement: SELECT * FROM {staging_schema}.customer
309+
refreshPolicy: incremental_strict
310+
tableDetails:
311+
configFlags:
312+
- disableOperationalMetadata
292313
293314
For more detailed information about configuration options, refer to the :doc:`dataflow_spec_reference` documentation.

samples/bronze_sample/src/dataflows/feature_samples/dataflowspec/materialized_views_main.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@
4141
"spark.sql.session.timeZone": "Australia/Sydney"
4242
}
4343
}
44+
},
45+
"feature_mv_with_refresh_policy": {
46+
"sqlStatement": "SELECT * FROM {staging_schema}.customer",
47+
"refreshPolicy": "incremental_strict",
48+
"tableDetails": {
49+
"configFlags": ["disableOperationalMetadata"]
50+
}
4451
}
4552
}
4653
}

src/dataflow/targets/delta_materialized_view.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ class TargetDeltaMaterializedView(BaseTargetDelta, SqlMixin, OperationalMetadata
3030
sqlStatement (str, optional): SQL statement.
3131
rowFilter (str, optional): Row filter for the target table.
3232
sparkConf (Dict, optional): Spark configuration for the target table.
33+
refreshPolicy (str, optional): Refresh policy for the materialized view.
34+
One of: "auto", "incremental", "incremental_strict", "full".
35+
Defaults to None (uses the SDP default of "auto").
3336
3437
Properties:
3538
schema_type (str): Type of schema ["json", "ddl"].
@@ -47,6 +50,7 @@ class TargetDeltaMaterializedView(BaseTargetDelta, SqlMixin, OperationalMetadata
4750
remove_columns: Remove columns from the target schema.
4851
"""
4952
sourceView: Optional[str] = None
53+
refreshPolicy: Optional[str] = None
5054

5155
def _create_table(
5256
self,
@@ -80,7 +84,8 @@ def _create_table(
8084
elif self.rawSql:
8185
sql = substitution_manager.substitute_string(self.rawSql)
8286

83-
@dp.table(
87+
decorator = dp.materialized_view if self.refreshPolicy else dp.table
88+
decorator_kwargs = dict(
8489
name=self.table,
8590
comment=self.comment,
8691
spark_conf=self.sparkConf,
@@ -93,6 +98,10 @@ def _create_table(
9398
cluster_by_auto=self.clusterByAuto,
9499
private=self.private
95100
)
101+
if self.refreshPolicy:
102+
decorator_kwargs["refresh_policy"] = self.refreshPolicy
103+
104+
@decorator(**decorator_kwargs)
96105
@dp.expect_all(expectations.get("expect_all", {}) if expectations else {})
97106
@dp.expect_all_or_drop(expectations.get("expect_all_or_drop", {}) if expectations else {})
98107
@dp.expect_all_or_fail(expectations.get("expect_all_or_fail", {}) if expectations else {})

src/dataflow_spec_builder/transformer/materialized_views.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ def _build_target_details(self, mv_name: str, mv_config: Dict) -> Dict:
5050
"table": mv_name,
5151
"type": TableType.MATERIALIZED_VIEW,
5252
"sqlPath": mv_config.get("sqlPath"),
53-
"sqlStatement": mv_config.get("sqlStatement")
53+
"sqlStatement": mv_config.get("sqlStatement"),
54+
"refreshPolicy": mv_config.get("refreshPolicy")
5455
})
5556
return target_details
5657

src/schemas/spec_materialized_views.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@
5050
},
5151
"additionalProperties": false
5252
},
53+
"refreshPolicy": {
54+
"type": "string",
55+
"enum": ["auto", "incremental", "incremental_strict", "full"]
56+
},
5357
"dataQualityExpectationsEnabled": {"type": "boolean"},
5458
"dataQualityExpectationsPath": {"type": "string"},
5559
"quarantineMode": {"type": "string", "enum": ["off", "flag", "table"], "default": "off"},

0 commit comments

Comments
 (0)