Skip to content

Commit 37344d0

Browse files
committed
fix: Fixed missed in PR 6343
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
1 parent fffd613 commit 37344d0

1 file changed

Lines changed: 14 additions & 2 deletions

File tree

  • sdk/python/feast/infra/compute_engines/ray

sdk/python/feast/infra/compute_engines/ray/compute.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,19 @@ def _materialize_from_offline_store(
185185
if isinstance(retrieval_job, RayRetrievalJob):
186186
ray_ds = retrieval_job.to_ray_dataset()
187187

188+
needs_offline = getattr(feature_view, "offline", False)
189+
sink_source = getattr(feature_view, "sink_source", None)
190+
191+
# Materialise the lazy pipeline ONCE into Ray object-store memory
192+
# when more than one write target will consume it.
193+
# Without this, each use of ray_ds (online write via map_batches,
194+
# to_arrow_refs for offline_write_batch, write_parquet for
195+
# sink_source) independently re-executes the full source-read
196+
# pipeline, producing up to three separate data snapshots that can
197+
# diverge when the source changes between executions.
198+
if needs_offline or sink_source is not None:
199+
ray_ds = ray_ds.materialize()
200+
188201
# Distributed online store write — each Ray worker writes its shard
189202
write_to_online_store_from_ray_ds(
190203
ray_ds=ray_ds,
@@ -195,7 +208,7 @@ def _materialize_from_offline_store(
195208

196209
# offline_write_batch and sink_source are independent — both can
197210
# apply when a feature view has offline=True AND a sink_source.
198-
if getattr(feature_view, "offline", False):
211+
if needs_offline:
199212
import pyarrow as pa
200213
import ray as _ray
201214

@@ -207,7 +220,6 @@ def _materialize_from_offline_store(
207220
progress=lambda x: None,
208221
)
209222

210-
sink_source = getattr(feature_view, "sink_source", None)
211223
if sink_source is not None:
212224
logger.debug(
213225
f"Writing derived view {feature_view.name} to sink_source: {sink_source.path}"

0 commit comments

Comments
 (0)