@@ -185,6 +185,19 @@ def _materialize_from_offline_store(
185185 if isinstance (retrieval_job , RayRetrievalJob ):
186186 ray_ds = retrieval_job .to_ray_dataset ()
187187
188+ needs_offline = getattr (feature_view , "offline" , False )
189+ sink_source = getattr (feature_view , "sink_source" , None )
190+
191+ # Materialise the lazy pipeline ONCE into Ray object-store memory
192+ # when more than one write target will consume it.
193+ # Without this, each use of ray_ds (online write via map_batches,
194+ # to_arrow_refs for offline_write_batch, write_parquet for
195+ # sink_source) independently re-executes the full source-read
196+ # pipeline, producing up to three separate data snapshots that can
197+ # diverge when the source changes between executions.
198+ if needs_offline or sink_source is not None :
199+ ray_ds = ray_ds .materialize ()
200+
188201 # Distributed online store write — each Ray worker writes its shard
189202 write_to_online_store_from_ray_ds (
190203 ray_ds = ray_ds ,
@@ -195,7 +208,7 @@ def _materialize_from_offline_store(
195208
196209 # offline_write_batch and sink_source are independent — both can
197210 # apply when a feature view has offline=True AND a sink_source.
198- if getattr ( feature_view , "offline" , False ) :
211+ if needs_offline :
199212 import pyarrow as pa
200213 import ray as _ray
201214
@@ -207,7 +220,6 @@ def _materialize_from_offline_store(
207220 progress = lambda x : None ,
208221 )
209222
210- sink_source = getattr (feature_view , "sink_source" , None )
211223 if sink_source is not None :
212224 logger .debug (
213225 f"Writing derived view { feature_view .name } to sink_source: { sink_source .path } "
0 commit comments