@@ -224,6 +224,74 @@ def test_delete_partitioned_table_positional_deletes(spark: SparkSession, sessio
224224 assert tbl .scan ().to_arrow ().to_pydict () == {"number_partitioned" : [10 ], "number" : [20 ]}
225225
226226
227+ @pytest .mark .integration
228+ @pytest .mark .filterwarnings ("ignore:Merge on read is not yet supported, falling back to copy-on-write" )
229+ def test_delete_partitioned_table_positional_deletes_empty_batch (spark : SparkSession , session_catalog : RestCatalog ) -> None :
230+ identifier = "default.test_delete_partitioned_table_positional_deletes_empty_batch"
231+
232+ run_spark_commands (
233+ spark ,
234+ [
235+ f"DROP TABLE IF EXISTS { identifier } " ,
236+ f"""
237+ CREATE TABLE { identifier } (
238+ number_partitioned int,
239+ number int
240+ )
241+ USING iceberg
242+ PARTITIONED BY (number_partitioned)
243+ TBLPROPERTIES(
244+ 'format-version' = 2,
245+ 'write.delete.mode'='merge-on-read',
246+ 'write.update.mode'='merge-on-read',
247+ 'write.merge.mode'='merge-on-read',
248+ 'write.parquet.row-group-limit'=1
249+ )
250+ """ ,
251+ ],
252+ )
253+
254+ tbl = session_catalog .load_table (identifier )
255+
256+ arrow_table = pa .Table .from_arrays (
257+ [
258+ pa .array ([10 , 10 , 10 ]),
259+ pa .array ([1 , 2 , 3 ]),
260+ ],
261+ schema = pa .schema ([pa .field ("number_partitioned" , pa .int32 ()), pa .field ("number" , pa .int32 ())]),
262+ )
263+
264+ tbl .append (arrow_table )
265+
266+ assert len (tbl .scan ().to_arrow ()) == 3
267+
268+ run_spark_commands (
269+ spark ,
270+ [
271+ # Generate a positional delete
272+ f"""
273+ DELETE FROM { identifier } WHERE number = 1
274+ """ ,
275+ ],
276+ )
277+ # Assert that there is just a single Parquet file, that has one merge on read file
278+ tbl = tbl .refresh ()
279+
280+ files = list (tbl .scan ().plan_files ())
281+ assert len (files ) == 1
282+ assert len (files [0 ].delete_files ) == 1
283+
284+ assert len (tbl .scan ().to_arrow ()) == 2
285+
286+ assert len (tbl .scan (row_filter = "number_partitioned == 10" ).to_arrow ()) == 2
287+
288+ assert len (tbl .scan (row_filter = "number_partitioned == 1" ).to_arrow ()) == 0
289+
290+ reader = tbl .scan (row_filter = "number_partitioned == 1" ).to_arrow_batch_reader ()
291+ assert isinstance (reader , pa .RecordBatchReader )
292+ assert len (reader .read_all ()) == 0
293+
294+
227295@pytest .mark .integration
228296@pytest .mark .filterwarnings ("ignore:Merge on read is not yet supported, falling back to copy-on-write" )
229297def test_overwrite_partitioned_table (spark : SparkSession , session_catalog : RestCatalog ) -> None :
0 commit comments