@@ -221,6 +221,44 @@ def test_incremental_timestamp(self):
221221 }, schema = self .pa_schema )
222222 self .assertEqual (expected , actual )
223223
224+ def test_incremental_read_multi_snapshots (self ):
225+ schema = Schema .from_pyarrow_schema (self .pa_schema ,
226+ partition_keys = ['dt' ],
227+ primary_keys = ['user_id' , 'dt' ],
228+ options = {'bucket' : '2' })
229+ self .catalog .create_table ('default.test_incremental_parquet' , schema , False )
230+ table = self .catalog .get_table ('default.test_incremental_parquet' )
231+ write_builder = table .new_batch_write_builder ()
232+ for i in range (1 , 101 ):
233+ table_write = write_builder .new_write ()
234+ table_commit = write_builder .new_commit ()
235+ pa_table = pa .Table .from_pydict ({
236+ 'user_id' : [i ],
237+ 'item_id' : [1000 + i ],
238+ 'behavior' : [f'snap{ i } ' ],
239+ 'dt' : ['p1' if i % 2 == 1 else 'p2' ],
240+ }, schema = self .pa_schema )
241+ table_write .write_arrow (pa_table )
242+ table_commit .commit (table_write .prepare_commit ())
243+ table_write .close ()
244+ table_commit .close ()
245+
246+ snapshot_manager = SnapshotManager (table )
247+ t10 = snapshot_manager .get_snapshot_by_id (10 ).time_millis
248+ t20 = snapshot_manager .get_snapshot_by_id (20 ).time_millis
249+
250+ table_inc = table .copy ({CoreOptions .INCREMENTAL_BETWEEN_TIMESTAMP : f"{ t10 } ,{ t20 } " })
251+ read_builder = table_inc .new_read_builder ()
252+ actual = self ._read_test_table (read_builder ).sort_by ('user_id' )
253+
254+ expected = pa .Table .from_pydict ({
255+ 'user_id' : list (range (11 , 21 )),
256+ 'item_id' : [1000 + i for i in range (11 , 21 )],
257+ 'behavior' : [f'snap{ i } ' for i in range (11 , 21 )],
258+ 'dt' : ['p1' if i % 2 == 1 else 'p2' for i in range (11 , 21 )],
259+ }, schema = self .pa_schema ).sort_by ('user_id' )
260+ self .assertEqual (expected , actual )
261+
224262 def _write_test_table (self , table ):
225263 write_builder = table .new_batch_write_builder ()
226264
0 commit comments