@@ -212,21 +212,23 @@ def apply(
212212 other_part = other_axis_partition ._data
213213 args = [other_part ] + list (args )
214214
215- de = self ._apply (func , args , kwargs )
215+ obj , meta , meta_off = self ._apply (func , args , kwargs ). exec ( )
216216 if num_splits > 1 :
217- de = _DeferredSplit (de , self .axis , num_splits , lengths )
217+ obj = _DeferredSplit (obj , self .axis , num_splits , lengths )
218218 if lengths is not None and len (lengths ) != num_splits :
219219 lengths = None
220220 result = [
221221 PandasOnRayDataframePartition (
222222 _DeferredGetChunk (
223- de , i , lengths [i ] if lengths is not None else None
223+ obj , i , lengths [i ] if lengths is not None else None
224224 )
225225 )
226226 for i in range (num_splits )
227227 ]
228228 else :
229- result = [PandasOnRayDataframePartition (de )]
229+ result = [
230+ PandasOnRayDataframePartition (obj , meta = meta , meta_offset = meta_off )
231+ ]
230232 if self .full_axis or other_axis_partition is not None :
231233 return result
232234 else :
@@ -338,7 +340,7 @@ def list_of_block_partitions(self) -> list:
338340 if self ._list_of_block_partitions is not None :
339341 return self ._list_of_block_partitions
340342
341- data = self ._data_ref
343+ data = self ._data
342344 num_splits = self ._num_splits
343345 if num_splits > 1 :
344346 lengths = self ._chunk_lengths
0 commit comments