Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions python/ray/data/_internal/execution/interfaces/task_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

if TYPE_CHECKING:
from ray.data._internal.execution.operators.map_transformer import MapTransformer
from ray.data._internal.progress.base_progress import BaseProgressBar
from ray.data._internal.progress.base_progress import (
ProgressMetrics,
SubProgressUpdater,
)


_thread_local = threading.local()
Expand All @@ -22,10 +25,13 @@ class TaskContext:
# Name of the operator that this task belongs to.
op_name: str

# The dictionary of sub progress bar to update. The key is name of sub progress
# bar. Note this is only used on driver side.
# Immutable sub-progress snapshots keyed by sub-progress name.
# Note this is only used on the driver side.
# TODO(chengsu): clean it up from TaskContext with new optimizer framework.
sub_progress_bar_dict: Optional[Dict[str, "BaseProgressBar"]] = None
sub_progress_metrics: Optional[Dict[str, "ProgressMetrics"]] = None

# Driver-side helpers for mutating the immutable sub-progress snapshots.
sub_progress_updaters: Optional[Dict[str, "SubProgressUpdater"]] = None

# NOTE(hchen): `upstream_map_transformer` and `upstream_map_ray_remote_args`
# are only used for `RandomShuffle`. DO NOT use them for other operators.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
RefBundle,
TaskContext,
)
from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin
from ray.data._internal.execution.operators.sub_progress import SubProgressMixin
from ray.data._internal.stats import StatsDict
from ray.data.context import DataContext

if typing.TYPE_CHECKING:
from ray.data._internal.progress.base_progress import BaseProgressBar
from ray.data._internal.progress.base_progress import ProgressMetrics


class InternalQueueOperatorMixin(PhysicalOperator, abc.ABC):
Expand Down Expand Up @@ -100,9 +100,7 @@ def input_dependency(self) -> PhysicalOperator:
return self.input_dependencies[0]


class AllToAllOperator(
InternalQueueOperatorMixin, SubProgressBarMixin, PhysicalOperator
):
class AllToAllOperator(InternalQueueOperatorMixin, SubProgressMixin, PhysicalOperator):
"""A blocking operator that executes once its inputs are complete.

This operator implements distributed sort / shuffle operations, etc.
Expand Down Expand Up @@ -135,8 +133,17 @@ def __init__(
self._next_task_index = 0
self._num_outputs = num_outputs
self._output_rows = 0
# Keep the legacy attribute name during the transition. Some internal
# call sites still read this field directly instead of using the mixin.
self._sub_progress_bar_names = sub_progress_bar_names
self._sub_progress_bar_dict = None
(
self._sub_progress_metrics,
self._sub_progress_updaters,
) = (
self._create_sub_progress_state(sub_progress_bar_names)
if sub_progress_bar_names is not None
else (None, None)
)
self._input_buffer: FIFOBundleQueue = FIFOBundleQueue()
self._output_buffer: FIFOBundleQueue = FIFOBundleQueue()
self._stats: StatsDict = {}
Expand Down Expand Up @@ -176,7 +183,8 @@ def all_inputs_done(self) -> None:
ctx = TaskContext(
task_idx=self._next_task_index,
op_name=self.name,
sub_progress_bar_dict=self._sub_progress_bar_dict,
sub_progress_metrics=self._sub_progress_metrics,
sub_progress_updaters=self._sub_progress_updaters,
target_max_block_size_override=self.target_max_block_size_override,
)
# NOTE: We don't account object store memory use from intermediate `bulk_fn`
Expand Down Expand Up @@ -213,13 +221,11 @@ def get_transformation_fn(self) -> AllToAllTransformFn:
def progress_str(self) -> str:
return f"{self.num_output_rows_total() or 0} rows output"

def get_sub_progress_bar_names(self) -> Optional[List[str]]:
return self._sub_progress_bar_names
def get_sub_progress_metrics(self) -> Optional[dict[str, "ProgressMetrics"]]:
return self._sub_progress_metrics

def set_sub_progress_bar(self, name: str, pg: "BaseProgressBar"):
if self._sub_progress_bar_dict is None:
self._sub_progress_bar_dict = {}
self._sub_progress_bar_dict[name] = pg
def get_sub_progress_updaters(self):
return self._sub_progress_updaters

def supports_fusion(self):
return True
Expand Down
35 changes: 16 additions & 19 deletions python/ray/data/_internal/execution/operators/hash_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import random
import threading
import time
import typing
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import (
Expand Down Expand Up @@ -54,9 +53,10 @@
TaskExecDriverStats,
estimate_total_num_of_blocks,
)
from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin
from ray.data._internal.execution.operators.sub_progress import SubProgressMixin
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.output_buffer import BlockOutputBuffer, OutputBlockSizeOption
from ray.data._internal.progress.base_progress import ProgressMetrics
from ray.data._internal.stats import OpRuntimeMetrics
from ray.data._internal.table_block import TableBlockAccessor
from ray.data._internal.util import GiB, MiB
Expand All @@ -77,9 +77,6 @@
DataContext,
)

if typing.TYPE_CHECKING:
from ray.data._internal.progress.base_progress import BaseProgressBar

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -468,7 +465,7 @@ def _derive_max_shuffle_aggregators(
)


class HashShufflingOperatorBase(PhysicalOperator, SubProgressBarMixin):
class HashShufflingOperatorBase(PhysicalOperator, SubProgressMixin):
"""Physical operator base-class for any operators requiring hash-based
shuffling.

Expand Down Expand Up @@ -647,10 +644,11 @@ def __init__(
self._health_monitoring_start_time: float = 0.0
self._pending_aggregators_refs: Optional[List[ObjectRef[ActorHandle]]] = None

# sub-progress bar initializations
self._shuffle_bar = None
(
self._sub_progress_metrics,
self._sub_progress_updaters,
) = self._create_sub_progress_state([self.shuffle_name, self.reduce_name])
self._shuffle_metrics = OpRuntimeMetrics(self)
self._reduce_bar = None
self._reduce_metrics = OpRuntimeMetrics(self)

def start(self, options: ExecutionOptions) -> None:
Expand Down Expand Up @@ -771,7 +769,9 @@ def _on_partitioning_done(cur_shuffle_task_idx: int):
)

# Update Shuffle progress bar
self._shuffle_bar.update(increment=input_block_metadata.num_rows or 0)
self._sub_progress_updaters[self.shuffle_name].update(
increment=input_block_metadata.num_rows or 0
)

# TODO update metrics
task = self._shuffling_tasks[input_index][
Expand Down Expand Up @@ -807,7 +807,7 @@ def _on_partitioning_done(cur_shuffle_task_idx: int):
self._shuffle_metrics,
total_num_tasks=None,
)
self._shuffle_bar.update(total=num_rows)
self._sub_progress_updaters[self.shuffle_name].update(total=num_rows)

def has_next(self) -> bool:
self._try_finalize()
Expand Down Expand Up @@ -881,7 +881,7 @@ def _on_bundle_ready(partition_id: int, bundle: RefBundle):
self._estimated_output_num_rows = num_rows

# Update Finalize progress bar
self._reduce_bar.update(
self._sub_progress_updaters[self.reduce_name].update(
increment=bundle.num_rows() or 0, total=self.num_output_rows_total()
)

Expand Down Expand Up @@ -1280,14 +1280,11 @@ def _get_min_max_partition_shards_compaction_thresholds(
) -> Optional[Tuple[int, int]]:
return None

def get_sub_progress_bar_names(self) -> Optional[List[str]]:
return [self.shuffle_name, self.reduce_name]
def get_sub_progress_metrics(self) -> Optional[Dict[str, ProgressMetrics]]:
return self._sub_progress_metrics

def set_sub_progress_bar(self, name: str, pg: "BaseProgressBar"):
if self.shuffle_name == name:
self._shuffle_bar = pg
elif self.reduce_name == name:
self._reduce_bar = pg
def get_sub_progress_updaters(self):
return self._sub_progress_updaters


class HashShuffleOperator(HashShufflingOperatorBase):
Expand Down
51 changes: 34 additions & 17 deletions python/ray/data/_internal/execution/operators/sub_progress.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,48 @@
import typing
from abc import ABC, abstractmethod
from typing import List, Optional
from typing import Dict, List, Optional, Tuple

if typing.TYPE_CHECKING:
from ray.data._internal.progress.base_progress import BaseProgressBar
from ray.data._internal.progress.base_progress import (
ProgressMetrics,
SubProgressUpdater,
)


class SubProgressBarMixin(ABC):
"""Abstract class for operators that support sub-progress bars"""
class SubProgressMixin(ABC):
"""Abstract class for operators that support driver-side sub-progress tracking."""

@abstractmethod
def get_sub_progress_bar_names(self) -> Optional[List[str]]:
def get_sub_progress_metrics(self) -> Optional[Dict[str, "ProgressMetrics"]]:
"""
Returns list of sub-progress bar names

This is used to create the sub-progress bars in the progress manager.
Note that sub-progress bars will be created in the order returned by
this method.
Returns sub-progress metrics keyed by sub-progress name.
"""
...

@abstractmethod
def set_sub_progress_bar(self, name: str, pg: "BaseProgressBar"):
"""
Sets sub-progress bars

name: name of sub-progress bar
pg: a progress bar. Can be sub-progress bars for rich, tqdm, etc.
"""
def get_sub_progress_updaters(self) -> Optional[Dict[str, "SubProgressUpdater"]]:
"""Returns driver-side helpers for mutating sub-progress metrics."""
...

@classmethod
def _create_sub_progress_state(
cls, names: List[str]
) -> Tuple[Dict[str, "ProgressMetrics"], Dict[str, "SubProgressUpdater"]]:
from ray.data._internal.progress.base_progress import (
BaseExecutionProgressManager,
ProgressMetrics,
SubProgressUpdater,
)

metrics = {
name: ProgressMetrics(name=name, total=None, completed=0) for name in names
}
updaters = {
name: SubProgressUpdater(
metrics,
name=name,
max_name_length=BaseExecutionProgressManager.MAX_NAME_LENGTH,
)
for name in names
}
return metrics, updaters
42 changes: 21 additions & 21 deletions python/ray/data/_internal/gpu_shuffle/hash_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
import pickle
import time
import typing
from typing import (
Dict,
Iterator,
Expand Down Expand Up @@ -31,15 +30,12 @@
from ray.data._internal.execution.operators.hash_shuffle import (
_get_total_cluster_resources,
)
from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin
from ray.data._internal.execution.operators.sub_progress import SubProgressMixin
from ray.data._internal.progress.base_progress import ProgressMetrics
from ray.data._internal.stats import OpRuntimeMetrics
from ray.data.block import Block, BlockAccessor, BlockStats, to_stats
from ray.data.context import DataContext

if typing.TYPE_CHECKING:

from ray.data._internal.progress.base_progress import BaseProgressBar

logger = logging.getLogger(__name__)

# Arrow schema metadata key for the rapidsmpf partition ID.
Expand Down Expand Up @@ -398,7 +394,7 @@ def _derive_num_gpu_ranks(data_context: DataContext) -> int:
# ---------------------------------------------------------------------------


class GPUShuffleOperator(PhysicalOperator, SubProgressBarMixin):
class GPUShuffleOperator(PhysicalOperator, SubProgressMixin):
"""GPU-native shuffle operator using RAPIDS MPF + UCXX.

Unlike the CPU ``HashShuffleOperator``, this operator:
Expand All @@ -421,6 +417,9 @@ class GPUShuffleOperator(PhysicalOperator, SubProgressBarMixin):
single call.
"""

GPU_SHUFFLE_PROGRESS_NAME = "GPU Shuffle"
GPU_REDUCE_PROGRESS_NAME = "GPU Reduce"

def __init__(
self,
input_op: PhysicalOperator,
Expand Down Expand Up @@ -469,9 +468,12 @@ def __init__(
self._shuffled_blocks_stats: List[BlockStats] = []
self._output_blocks_stats: List[BlockStats] = []

# Progress bars (populated by SubProgressBarMixin callbacks)
self._shuffle_bar = None
self._reduce_bar = None
(
self._sub_progress_metrics,
self._sub_progress_updaters,
) = self._create_sub_progress_state(
[self.GPU_SHUFFLE_PROGRESS_NAME, self.GPU_REDUCE_PROGRESS_NAME]
)

# Metrics
self._shuffle_metrics = OpRuntimeMetrics(self)
Expand Down Expand Up @@ -511,8 +513,9 @@ def _on_insert_done(idx: int = task_idx) -> None:
task_id=task.get_task_id(),
)

if self._shuffle_bar is not None:
self._shuffle_bar.update(total=self._next_block_idx)
self._sub_progress_updaters[self.GPU_SHUFFLE_PROGRESS_NAME].update(
total=self._next_block_idx
)

def _is_inserting_done(self) -> bool:
return self._inputs_complete and len(self._insert_tasks) == 0
Expand Down Expand Up @@ -589,7 +592,7 @@ def _on_bundle_ready(bundle: RefBundle) -> None:
self._estimated_output_num_rows = num_rows

# Update Finalize progress bar
self._reduce_bar.update(
self._sub_progress_updaters[self.GPU_REDUCE_PROGRESS_NAME].update(
increment=bundle.num_rows() or 0, total=self.num_output_rows_total()
)

Expand Down Expand Up @@ -693,17 +696,14 @@ def get_actor_info(self):
)

# ------------------------------------------------------------------
# SubProgressBarMixin
# SubProgressMixin
# ------------------------------------------------------------------

def get_sub_progress_bar_names(self) -> List[str]:
return ["GPU Shuffle", "GPU Reduce"]
def get_sub_progress_metrics(self) -> Dict[str, ProgressMetrics]:
return self._sub_progress_metrics

def set_sub_progress_bar(self, name: str, pg: "BaseProgressBar") -> None:
if name == "GPU Shuffle":
self._shuffle_bar = pg
elif name == "GPU Reduce":
self._reduce_bar = pg
def get_sub_progress_updaters(self):
return self._sub_progress_updaters

# ------------------------------------------------------------------
# Stats
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/planner/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def fn(
else:
# Use same number of output partitions.
num_outputs = num_mappers
sample_bar = ctx.sub_progress_bar_dict[
sample_bar = ctx.sub_progress_updaters[
SortTaskSpec.SORT_SAMPLE_SUB_PROGRESS_BAR_NAME
]
# Sample boundaries for aggregate key.
Expand Down
Loading
Loading