@@ -21,6 +21,7 @@ class CreateIcebergMaterializedView(SQLHandler):
2121 ON iceberg_table
2222 [ catalog ]
2323 [ storage ]
24+ [ where ]
2425 ;
2526
2627 # If not exists
@@ -42,6 +43,9 @@ class CreateIcebergMaterializedView(SQLHandler):
4243 _link_config = S3 CONFIG '<link-config>'
4344 _link_creds = CREDENTIALS '<link-creds>'
4445
46+ # Where clause
47+ where = WHERE <expr>
48+
4549 Description
4650 -----------
4751 Create an Iceberg materialized view that syncs data from an Iceberg table
@@ -107,6 +111,10 @@ def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]:
107111 '[catalog.]database.table' ,
108112 )
109113
114+ where = ''
115+ if params .get ('where' ):
116+ where = f' WHERE { params ["where" ]} '
117+
110118 # Iceberg expects lowercase
111119 if iceberg_database :
112120 iceberg_database = iceberg_database .lower ()
@@ -180,21 +188,26 @@ def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]:
180188 with connect () as conn :
181189 with conn .cursor () as cur :
182190 # Infer and create the pipeline.
183- # It also creates a table (and optionally a view in case of merge pipeline) with the same name
191+ # It also creates a table (and optionally a view in case of
192+ # merge pipeline) with the same name
184193 cur .execute (rf'''
185194 CREATE INFERRED PIPELINE `{ pipeline_name } ` AS
186195 LOAD DATA S3 '{ table_id } '
187196 CONFIG '{ config_json } '
188197 CREDENTIALS '{ creds_json } '
189198 FORMAT ICEBERG
190199 OPTIONS = 'merge'
200+ { where }
191201 ''' )
192202
193203 # Start the pipeline
194204 cur .execute (rf'START PIPELINE `{ pipeline_name } `' )
195205
196206 # Create view with user-provided name
197- cur .execute (rf'CREATE VIEW `{ view_database } `.`{ view_table } ` AS SELECT * FROM `{ pipeline_name } `' )
207+ cur .execute (rf'''
208+ CREATE VIEW `{ view_database } `.`{ view_table } `
209+ AS SELECT * FROM `{ pipeline_name } `
210+ ''' )
198211
199212 # Return result
200213 res = result .FusionSQLResult ()
0 commit comments