Skip to content

Commit 79e5b94

Browse files
feat(materialization): add time chunks management
Signed-off-by: Alan Gauthier <alan.gauthier@jobteaser.com>
1 parent d41becf commit 79e5b94

6 files changed

Lines changed: 942 additions & 35 deletions

File tree

sdk/python/feast/cli/cli.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414
import json
1515
import logging
16-
from datetime import datetime
16+
from datetime import datetime, timedelta
1717
from importlib.metadata import version as importlib_version
1818
from pathlib import Path
1919
from typing import List, Optional
@@ -366,6 +366,24 @@ def registry_dump_command(ctx: click.Context):
366366
default=None,
367367
help="Version to materialize (e.g., 'v2'). Requires --views with exactly one feature view.",
368368
)
369+
@click.option(
370+
"--chunk-hours",
371+
type=int,
372+
default=None,
373+
help="Split materialization into N-hour chunks to reduce memory pressure.",
374+
)
375+
@click.option(
376+
"--chunk-minutes",
377+
type=int,
378+
default=None,
379+
help="Split materialization into N-minute chunks (finer granularity than --chunk-hours).",
380+
)
381+
@click.option(
382+
"--chunk-seconds",
383+
type=int,
384+
default=None,
385+
help="Split materialization into N-second chunks (finest granularity).",
386+
)
369387
@click.pass_context
370388
def materialize_command(
371389
ctx: click.Context,
@@ -374,6 +392,9 @@ def materialize_command(
374392
views: List[str],
375393
disable_event_timestamp: bool,
376394
feature_view_version: Optional[str],
395+
chunk_hours: Optional[int],
396+
chunk_minutes: Optional[int],
397+
chunk_seconds: Optional[int],
377398
):
378399
"""
379400
Run a (non-incremental) materialization job to ingest data into the online store. Feast
@@ -387,6 +408,18 @@ def materialize_command(
387408
"""
388409
store = create_feature_store(ctx)
389410

411+
if sum(v is not None for v in [chunk_hours, chunk_minutes, chunk_seconds]) > 1:
412+
raise click.UsageError(
413+
"Only one of --chunk-hours, --chunk-minutes, --chunk-seconds may be specified."
414+
)
415+
chunk_size: Optional[timedelta] = None
416+
if chunk_hours is not None:
417+
chunk_size = timedelta(hours=chunk_hours)
418+
elif chunk_minutes is not None:
419+
chunk_size = timedelta(minutes=chunk_minutes)
420+
elif chunk_seconds is not None:
421+
chunk_size = timedelta(seconds=chunk_seconds)
422+
390423
if disable_event_timestamp:
391424
if start_ts or end_ts:
392425
raise click.UsageError(
@@ -412,6 +445,7 @@ def materialize_command(
412445
end_date=end_date,
413446
disable_event_timestamp=disable_event_timestamp,
414447
version=feature_view_version,
448+
chunk_size=chunk_size,
415449
)
416450

417451

@@ -429,12 +463,33 @@ def materialize_command(
429463
default=None,
430464
help="Version to materialize (e.g., 'v2'). Requires --views with exactly one feature view.",
431465
)
466+
@click.option(
467+
"--chunk-hours",
468+
type=int,
469+
default=None,
470+
help="Split materialization into N-hour chunks to reduce memory pressure.",
471+
)
472+
@click.option(
473+
"--chunk-minutes",
474+
type=int,
475+
default=None,
476+
help="Split materialization into N-minute chunks (finer granularity than --chunk-hours).",
477+
)
478+
@click.option(
479+
"--chunk-seconds",
480+
type=int,
481+
default=None,
482+
help="Split materialization into N-second chunks (finest granularity).",
483+
)
432484
@click.pass_context
433485
def materialize_incremental_command(
434486
ctx: click.Context,
435487
end_ts: str,
436488
views: List[str],
437489
feature_view_version: Optional[str],
490+
chunk_hours: Optional[int],
491+
chunk_minutes: Optional[int],
492+
chunk_seconds: Optional[int],
438493
):
439494
"""
440495
Run an incremental materialization job to ingest new data into the online store. Feast will read
@@ -445,10 +500,24 @@ def materialize_incremental_command(
445500
END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
446501
"""
447502
store = create_feature_store(ctx)
503+
504+
if sum(v is not None for v in [chunk_hours, chunk_minutes, chunk_seconds]) > 1:
505+
raise click.UsageError(
506+
"Only one of --chunk-hours, --chunk-minutes, --chunk-seconds may be specified."
507+
)
508+
chunk_size: Optional[timedelta] = None
509+
if chunk_hours is not None:
510+
chunk_size = timedelta(hours=chunk_hours)
511+
elif chunk_minutes is not None:
512+
chunk_size = timedelta(minutes=chunk_minutes)
513+
elif chunk_seconds is not None:
514+
chunk_size = timedelta(seconds=chunk_seconds)
515+
448516
store.materialize_incremental(
449517
feature_views=None if not views else views,
450518
end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)),
451519
version=feature_view_version,
520+
chunk_size=chunk_size,
452521
)
453522

454523

0 commit comments

Comments
 (0)