diff --git a/python/ray/data/_internal/execution/interfaces/task_context.py b/python/ray/data/_internal/execution/interfaces/task_context.py index 0b109992812e..df8f7186ca7a 100644 --- a/python/ray/data/_internal/execution/interfaces/task_context.py +++ b/python/ray/data/_internal/execution/interfaces/task_context.py @@ -5,7 +5,10 @@ if TYPE_CHECKING: from ray.data._internal.execution.operators.map_transformer import MapTransformer - from ray.data._internal.progress.base_progress import BaseProgressBar + from ray.data._internal.progress.base_progress import ( + ProgressMetrics, + SubProgressUpdater, + ) _thread_local = threading.local() @@ -22,10 +25,13 @@ class TaskContext: # Name of the operator that this task belongs to. op_name: str - # The dictionary of sub progress bar to update. The key is name of sub progress - # bar. Note this is only used on driver side. + # Immutable sub-progress snapshots keyed by sub-progress name. + # Note this is only used on the driver side. # TODO(chengsu): clean it up from TaskContext with new optimizer framework. - sub_progress_bar_dict: Optional[Dict[str, "BaseProgressBar"]] = None + sub_progress_metrics: Optional[Dict[str, "ProgressMetrics"]] = None + + # Driver-side helpers for mutating the immutable sub-progress snapshots. + sub_progress_updaters: Optional[Dict[str, "SubProgressUpdater"]] = None # NOTE(hchen): `upstream_map_transformer` and `upstream_map_ray_remote_args` # are only used for `RandomShuffle`. DO NOT use them for other operators. diff --git a/python/ray/data/_internal/execution/operators/base_physical_operator.py b/python/ray/data/_internal/execution/operators/base_physical_operator.py index e34085cfd02c..3e69fc93d77c 100644 --- a/python/ray/data/_internal/execution/operators/base_physical_operator.py +++ b/python/ray/data/_internal/execution/operators/base_physical_operator.py @@ -11,12 +11,12 @@ RefBundle, TaskContext, ) -from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin +from ray.data._internal.execution.operators.sub_progress import SubProgressMixin from ray.data._internal.stats import StatsDict from ray.data.context import DataContext if typing.TYPE_CHECKING: - from ray.data._internal.progress.base_progress import BaseProgressBar + from ray.data._internal.progress.base_progress import ProgressMetrics class InternalQueueOperatorMixin(PhysicalOperator, abc.ABC): @@ -100,9 +100,7 @@ def input_dependency(self) -> PhysicalOperator: return self.input_dependencies[0] -class AllToAllOperator( - InternalQueueOperatorMixin, SubProgressBarMixin, PhysicalOperator -): +class AllToAllOperator(InternalQueueOperatorMixin, SubProgressMixin, PhysicalOperator): """A blocking operator that executes once its inputs are complete. This operator implements distributed sort / shuffle operations, etc. @@ -135,8 +133,17 @@ def __init__( self._next_task_index = 0 self._num_outputs = num_outputs self._output_rows = 0 + # Keep the legacy attribute name during the transition. Some internal + # call sites still read this field directly instead of using the mixin. self._sub_progress_bar_names = sub_progress_bar_names - self._sub_progress_bar_dict = None + ( + self._sub_progress_metrics, + self._sub_progress_updaters, + ) = ( + self._create_sub_progress_state(sub_progress_bar_names) + if sub_progress_bar_names is not None + else (None, None) + ) self._input_buffer: FIFOBundleQueue = FIFOBundleQueue() self._output_buffer: FIFOBundleQueue = FIFOBundleQueue() self._stats: StatsDict = {} @@ -176,7 +183,8 @@ def all_inputs_done(self) -> None: ctx = TaskContext( task_idx=self._next_task_index, op_name=self.name, - sub_progress_bar_dict=self._sub_progress_bar_dict, + sub_progress_metrics=self._sub_progress_metrics, + sub_progress_updaters=self._sub_progress_updaters, target_max_block_size_override=self.target_max_block_size_override, ) # NOTE: We don't account object store memory use from intermediate `bulk_fn` @@ -213,13 +221,11 @@ def get_transformation_fn(self) -> AllToAllTransformFn: def progress_str(self) -> str: return f"{self.num_output_rows_total() or 0} rows output" - def get_sub_progress_bar_names(self) -> Optional[List[str]]: - return self._sub_progress_bar_names + def get_sub_progress_metrics(self) -> Optional[dict[str, "ProgressMetrics"]]: + return self._sub_progress_metrics - def set_sub_progress_bar(self, name: str, pg: "BaseProgressBar"): - if self._sub_progress_bar_dict is None: - self._sub_progress_bar_dict = {} - self._sub_progress_bar_dict[name] = pg + def get_sub_progress_updaters(self): + return self._sub_progress_updaters def supports_fusion(self): return True diff --git a/python/ray/data/_internal/execution/operators/hash_shuffle.py b/python/ray/data/_internal/execution/operators/hash_shuffle.py index 0c16b7151e5a..4b6b95c6ff3a 100644 --- a/python/ray/data/_internal/execution/operators/hash_shuffle.py +++ b/python/ray/data/_internal/execution/operators/hash_shuffle.py @@ -8,7 +8,6 @@ import random import threading import time -import typing from collections import defaultdict, deque from dataclasses import dataclass from typing import ( @@ -54,9 +53,10 @@ TaskExecDriverStats, estimate_total_num_of_blocks, ) -from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin +from ray.data._internal.execution.operators.sub_progress import SubProgressMixin from ray.data._internal.logical.interfaces import LogicalOperator from ray.data._internal.output_buffer import BlockOutputBuffer, OutputBlockSizeOption +from ray.data._internal.progress.base_progress import ProgressMetrics from ray.data._internal.stats import OpRuntimeMetrics from ray.data._internal.table_block import TableBlockAccessor from ray.data._internal.util import GiB, MiB @@ -77,9 +77,6 @@ DataContext, ) -if typing.TYPE_CHECKING: - from ray.data._internal.progress.base_progress import BaseProgressBar - logger = logging.getLogger(__name__) @@ -468,7 +465,7 @@ def _derive_max_shuffle_aggregators( ) -class HashShufflingOperatorBase(PhysicalOperator, SubProgressBarMixin): +class HashShufflingOperatorBase(PhysicalOperator, SubProgressMixin): """Physical operator base-class for any operators requiring hash-based shuffling. @@ -647,10 +644,11 @@ def __init__( self._health_monitoring_start_time: float = 0.0 self._pending_aggregators_refs: Optional[List[ObjectRef[ActorHandle]]] = None - # sub-progress bar initializations - self._shuffle_bar = None + ( + self._sub_progress_metrics, + self._sub_progress_updaters, + ) = self._create_sub_progress_state([self.shuffle_name, self.reduce_name]) self._shuffle_metrics = OpRuntimeMetrics(self) - self._reduce_bar = None self._reduce_metrics = OpRuntimeMetrics(self) def start(self, options: ExecutionOptions) -> None: @@ -771,7 +769,9 @@ def _on_partitioning_done(cur_shuffle_task_idx: int): ) # Update Shuffle progress bar - self._shuffle_bar.update(increment=input_block_metadata.num_rows or 0) + self._sub_progress_updaters[self.shuffle_name].update( + increment=input_block_metadata.num_rows or 0 + ) # TODO update metrics task = self._shuffling_tasks[input_index][ @@ -807,7 +807,7 @@ def _on_partitioning_done(cur_shuffle_task_idx: int): self._shuffle_metrics, total_num_tasks=None, ) - self._shuffle_bar.update(total=num_rows) + self._sub_progress_updaters[self.shuffle_name].update(total=num_rows) def has_next(self) -> bool: self._try_finalize() @@ -881,7 +881,7 @@ def _on_bundle_ready(partition_id: int, bundle: RefBundle): self._estimated_output_num_rows = num_rows # Update Finalize progress bar - self._reduce_bar.update( + self._sub_progress_updaters[self.reduce_name].update( increment=bundle.num_rows() or 0, total=self.num_output_rows_total() ) @@ -1280,14 +1280,11 @@ def _get_min_max_partition_shards_compaction_thresholds( ) -> Optional[Tuple[int, int]]: return None - def get_sub_progress_bar_names(self) -> Optional[List[str]]: - return [self.shuffle_name, self.reduce_name] + def get_sub_progress_metrics(self) -> Optional[Dict[str, ProgressMetrics]]: + return self._sub_progress_metrics - def set_sub_progress_bar(self, name: str, pg: "BaseProgressBar"): - if self.shuffle_name == name: - self._shuffle_bar = pg - elif self.reduce_name == name: - self._reduce_bar = pg + def get_sub_progress_updaters(self): + return self._sub_progress_updaters class HashShuffleOperator(HashShufflingOperatorBase): diff --git a/python/ray/data/_internal/execution/operators/sub_progress.py b/python/ray/data/_internal/execution/operators/sub_progress.py index 578407357718..afb63b64123d 100644 --- a/python/ray/data/_internal/execution/operators/sub_progress.py +++ b/python/ray/data/_internal/execution/operators/sub_progress.py @@ -1,31 +1,48 @@ import typing from abc import ABC, abstractmethod -from typing import List, Optional +from typing import Dict, List, Optional, Tuple if typing.TYPE_CHECKING: - from ray.data._internal.progress.base_progress import BaseProgressBar + from ray.data._internal.progress.base_progress import ( + ProgressMetrics, + SubProgressUpdater, + ) -class SubProgressBarMixin(ABC): - """Abstract class for operators that support sub-progress bars""" +class SubProgressMixin(ABC): + """Abstract class for operators that support driver-side sub-progress tracking.""" @abstractmethod - def get_sub_progress_bar_names(self) -> Optional[List[str]]: + def get_sub_progress_metrics(self) -> Optional[Dict[str, "ProgressMetrics"]]: """ - Returns list of sub-progress bar names - - This is used to create the sub-progress bars in the progress manager. - Note that sub-progress bars will be created in the order returned by - this method. + Returns sub-progress metrics keyed by sub-progress name. """ ... @abstractmethod - def set_sub_progress_bar(self, name: str, pg: "BaseProgressBar"): - """ - Sets sub-progress bars - - name: name of sub-progress bar - pg: a progress bar. Can be sub-progress bars for rich, tqdm, etc. - """ + def get_sub_progress_updaters(self) -> Optional[Dict[str, "SubProgressUpdater"]]: + """Returns driver-side helpers for mutating sub-progress metrics.""" ... + + @classmethod + def _create_sub_progress_state( + cls, names: List[str] + ) -> Tuple[Dict[str, "ProgressMetrics"], Dict[str, "SubProgressUpdater"]]: + from ray.data._internal.progress.base_progress import ( + BaseExecutionProgressManager, + ProgressMetrics, + SubProgressUpdater, + ) + + metrics = { + name: ProgressMetrics(name=name, total=None, completed=0) for name in names + } + updaters = { + name: SubProgressUpdater( + metrics, + name=name, + max_name_length=BaseExecutionProgressManager.MAX_NAME_LENGTH, + ) + for name in names + } + return metrics, updaters diff --git a/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py b/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py index 0c642ac5f8ff..eb205d630876 100644 --- a/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py +++ b/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py @@ -2,7 +2,6 @@ import logging import pickle import time -import typing from typing import ( Dict, Iterator, @@ -31,15 +30,12 @@ from ray.data._internal.execution.operators.hash_shuffle import ( _get_total_cluster_resources, ) -from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin +from ray.data._internal.execution.operators.sub_progress import SubProgressMixin +from ray.data._internal.progress.base_progress import ProgressMetrics from ray.data._internal.stats import OpRuntimeMetrics from ray.data.block import Block, BlockAccessor, BlockStats, to_stats from ray.data.context import DataContext -if typing.TYPE_CHECKING: - - from ray.data._internal.progress.base_progress import BaseProgressBar - logger = logging.getLogger(__name__) # Arrow schema metadata key for the rapidsmpf partition ID. @@ -398,7 +394,7 @@ def _derive_num_gpu_ranks(data_context: DataContext) -> int: # --------------------------------------------------------------------------- -class GPUShuffleOperator(PhysicalOperator, SubProgressBarMixin): +class GPUShuffleOperator(PhysicalOperator, SubProgressMixin): """GPU-native shuffle operator using RAPIDS MPF + UCXX. Unlike the CPU ``HashShuffleOperator``, this operator: @@ -421,6 +417,9 @@ class GPUShuffleOperator(PhysicalOperator, SubProgressBarMixin): single call. """ + GPU_SHUFFLE_PROGRESS_NAME = "GPU Shuffle" + GPU_REDUCE_PROGRESS_NAME = "GPU Reduce" + def __init__( self, input_op: PhysicalOperator, @@ -469,9 +468,12 @@ def __init__( self._shuffled_blocks_stats: List[BlockStats] = [] self._output_blocks_stats: List[BlockStats] = [] - # Progress bars (populated by SubProgressBarMixin callbacks) - self._shuffle_bar = None - self._reduce_bar = None + ( + self._sub_progress_metrics, + self._sub_progress_updaters, + ) = self._create_sub_progress_state( + [self.GPU_SHUFFLE_PROGRESS_NAME, self.GPU_REDUCE_PROGRESS_NAME] + ) # Metrics self._shuffle_metrics = OpRuntimeMetrics(self) @@ -511,8 +513,9 @@ def _on_insert_done(idx: int = task_idx) -> None: task_id=task.get_task_id(), ) - if self._shuffle_bar is not None: - self._shuffle_bar.update(total=self._next_block_idx) + self._sub_progress_updaters[self.GPU_SHUFFLE_PROGRESS_NAME].update( + total=self._next_block_idx + ) def _is_inserting_done(self) -> bool: return self._inputs_complete and len(self._insert_tasks) == 0 @@ -589,7 +592,7 @@ def _on_bundle_ready(bundle: RefBundle) -> None: self._estimated_output_num_rows = num_rows # Update Finalize progress bar - self._reduce_bar.update( + self._sub_progress_updaters[self.GPU_REDUCE_PROGRESS_NAME].update( increment=bundle.num_rows() or 0, total=self.num_output_rows_total() ) @@ -693,17 +696,14 @@ def get_actor_info(self): ) # ------------------------------------------------------------------ - # SubProgressBarMixin + # SubProgressMixin # ------------------------------------------------------------------ - def get_sub_progress_bar_names(self) -> List[str]: - return ["GPU Shuffle", "GPU Reduce"] + def get_sub_progress_metrics(self) -> Dict[str, ProgressMetrics]: + return self._sub_progress_metrics - def set_sub_progress_bar(self, name: str, pg: "BaseProgressBar") -> None: - if name == "GPU Shuffle": - self._shuffle_bar = pg - elif name == "GPU Reduce": - self._reduce_bar = pg + def get_sub_progress_updaters(self): + return self._sub_progress_updaters # ------------------------------------------------------------------ # Stats diff --git a/python/ray/data/_internal/planner/aggregate.py b/python/ray/data/_internal/planner/aggregate.py index 199382e226f0..f9631801448f 100644 --- a/python/ray/data/_internal/planner/aggregate.py +++ b/python/ray/data/_internal/planner/aggregate.py @@ -67,7 +67,7 @@ def fn( else: # Use same number of output partitions. num_outputs = num_mappers - sample_bar = ctx.sub_progress_bar_dict[ + sample_bar = ctx.sub_progress_updaters[ SortTaskSpec.SORT_SAMPLE_SUB_PROGRESS_BAR_NAME ] # Sample boundaries for aggregate key. diff --git a/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py b/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py index 262382b2a08e..91881cc4d1cb 100644 --- a/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py +++ b/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py @@ -76,10 +76,10 @@ def execute( shuffle_map = cached_remote_fn(self._exchange_spec.map) shuffle_reduce = cached_remote_fn(self._exchange_spec.reduce) - sub_progress_bar_dict = task_ctx.sub_progress_bar_dict + sub_progress_updaters = task_ctx.sub_progress_updaters bar_name = ExchangeTaskSpec.MAP_SUB_PROGRESS_BAR_NAME - assert bar_name in sub_progress_bar_dict, sub_progress_bar_dict - map_bar = sub_progress_bar_dict[bar_name] + assert bar_name in sub_progress_updaters, sub_progress_updaters + map_bar = sub_progress_updaters[bar_name] if _debug_limit_execution_to_num_blocks is not None: input_blocks_list = input_blocks_list[:_debug_limit_execution_to_num_blocks] @@ -111,8 +111,8 @@ def execute( self.warn_on_high_local_memory_store_usage() bar_name = ExchangeTaskSpec.REDUCE_SUB_PROGRESS_BAR_NAME - assert bar_name in sub_progress_bar_dict, sub_progress_bar_dict - reduce_bar = sub_progress_bar_dict[bar_name] + assert bar_name in sub_progress_updaters, sub_progress_updaters + reduce_bar = sub_progress_updaters[bar_name] if _debug_limit_execution_to_num_blocks is not None: output_num_blocks = _debug_limit_execution_to_num_blocks diff --git a/python/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py b/python/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py index af0c26f96caa..c059e9cacc49 100644 --- a/python/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py +++ b/python/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py @@ -534,10 +534,10 @@ def merge(*args, **kwargs): [output_num_blocks, stage.merge_schedule, *self._exchange_spec._map_args], ) - sub_progress_bar_dict = task_ctx.sub_progress_bar_dict + sub_progress_updaters = task_ctx.sub_progress_updaters bar_name = ExchangeTaskSpec.MAP_SUB_PROGRESS_BAR_NAME - assert bar_name in sub_progress_bar_dict, sub_progress_bar_dict - map_bar = sub_progress_bar_dict[bar_name] + assert bar_name in sub_progress_updaters, sub_progress_updaters + map_bar = sub_progress_updaters[bar_name] map_stage_executor = _PipelinedStageExecutor( map_stage_iter, stage.num_map_tasks_per_round, progress_bar=map_bar ) @@ -586,8 +586,8 @@ def merge(*args, **kwargs): # Execute and wait for the reduce stage. bar_name = ExchangeTaskSpec.REDUCE_SUB_PROGRESS_BAR_NAME - assert bar_name in sub_progress_bar_dict, sub_progress_bar_dict - reduce_bar = sub_progress_bar_dict[bar_name] + assert bar_name in sub_progress_updaters, sub_progress_updaters + reduce_bar = sub_progress_updaters[bar_name] shuffle_reduce = cached_remote_fn(self._exchange_spec.reduce) reduce_stage_iter = _ReduceStageIterator( diff --git a/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py b/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py index 1c3a563ef694..5d42f4d96aed 100644 --- a/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py +++ b/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py @@ -82,10 +82,10 @@ def execute( split_block_refs.append(b) split_metadata.extend(m) - sub_progress_bar_dict = ctx.sub_progress_bar_dict + sub_progress_updaters = ctx.sub_progress_updaters bar_name = ShuffleTaskSpec.SPLIT_REPARTITION_SUB_PROGRESS_BAR_NAME - assert bar_name in sub_progress_bar_dict, sub_progress_bar_dict - reduce_bar = sub_progress_bar_dict[bar_name] + assert bar_name in sub_progress_updaters, sub_progress_updaters + reduce_bar = sub_progress_updaters[bar_name] reduce_task = cached_remote_fn(self._exchange_spec.reduce) reduce_return = [ diff --git a/python/ray/data/_internal/planner/sort.py b/python/ray/data/_internal/planner/sort.py index 852154c66c36..3c614bb821da 100644 --- a/python/ray/data/_internal/planner/sort.py +++ b/python/ray/data/_internal/planner/sort.py @@ -45,7 +45,7 @@ def fn( # Sample boundaries for sort key. if not sort_key.boundaries: - sample_bar = ctx.sub_progress_bar_dict[ + sample_bar = ctx.sub_progress_updaters[ SortTaskSpec.SORT_SAMPLE_SUB_PROGRESS_BAR_NAME ] boundaries = SortTaskSpec.sample_boundaries( diff --git a/python/ray/data/_internal/progress/base_progress.py b/python/ray/data/_internal/progress/base_progress.py index 17c83be7f5f3..26cb598e8c55 100644 --- a/python/ray/data/_internal/progress/base_progress.py +++ b/python/ray/data/_internal/progress/base_progress.py @@ -2,10 +2,10 @@ import threading import typing from abc import ABC, abstractmethod -from typing import Any, List, Optional +from dataclasses import dataclass +from typing import Any, Callable, Dict, List, Optional import ray -from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin from ray.data._internal.progress.utils import truncate_operator_name if typing.TYPE_CHECKING: @@ -127,8 +127,8 @@ def __init__( verbose_progress: bool, ): """Initialize the progress manager, create all necessary progress bars - and sub-progress bars for the given topology. Sub-progress bars are - created for operators that implement the SubProgressBarMixin. + and sub-progress trackers for the given topology. Concrete progress bars + are created only for operators that implement the SubProgressMixin. Args: dataset_id: id of Dataset @@ -192,12 +192,36 @@ def update_operator_progress( ... -class NoopSubProgressBar(BaseProgressBar): - """Sub-Progress Bar for Noop (Disabled) Progress Manager""" +@dataclass(frozen=True) +class ProgressMetrics: + """Immutable snapshot of sub-progress state exposed to progress managers.""" - def __init__(self, name: str, max_name_length: int): + name: str + total: Optional[int] + completed: int + + +class SubProgressUpdater(BaseProgressBar): + """Driver-side helper that updates immutable sub-progress metrics.""" + + def __init__( + self, + metrics_by_name: Dict[str, ProgressMetrics], + name: str, + max_name_length: int, + ): + self._metrics_by_name = metrics_by_name + self._name = name self._max_name_length = max_name_length self._desc = truncate_operator_name(name, self._max_name_length) + self._lock = threading.Lock() + self._update_callbacks: List[Callable[[ProgressMetrics], None]] = [] + + def add_update_callback(self, callback: Callable[[ProgressMetrics], None]) -> None: + with self._lock: + self._update_callbacks.append(callback) + metrics = self._metrics_by_name[self._name] + callback(metrics) def set_description(self, name: str) -> None: self._desc = truncate_operator_name(name, self._max_name_length) @@ -206,13 +230,28 @@ def get_description(self) -> str: return self._desc def update(self, increment: int = 0, total: Optional[int] = None) -> None: - pass + with self._lock: + metrics = self._metrics_by_name[self._name] + new_total = total if total is not None else metrics.total + new_completed = metrics.completed + increment + updated_metrics = ProgressMetrics( + name=metrics.name, + total=new_total, + completed=new_completed, + ) + self._metrics_by_name[self._name] = updated_metrics + callbacks = list(self._update_callbacks) + for callback in callbacks: + callback(updated_metrics) - def refresh(self): - pass - def close(self): - pass +def make_sub_progress_sync_callback( + display_bar: Any, +) -> Callable[[ProgressMetrics], None]: + def sync_display(metrics: ProgressMetrics) -> None: + display_bar.update_absolute(metrics.completed, metrics.total) + + return sync_display class NoopExecutionProgressManager(BaseExecutionProgressManager): @@ -225,17 +264,7 @@ def __init__( show_op_progress: bool, verbose_progress: bool, ): - for state in topology.values(): - op = state.op - if not isinstance(op, SubProgressBarMixin): - continue - sub_pg_names = op.get_sub_progress_bar_names() - if sub_pg_names is not None: - for name in sub_pg_names: - pg = NoopSubProgressBar( - name=name, max_name_length=self.MAX_NAME_LENGTH - ) - op.set_sub_progress_bar(name, pg) + pass def start(self) -> None: pass diff --git a/python/ray/data/_internal/progress/logging_progress.py b/python/ray/data/_internal/progress/logging_progress.py index 6530c7202315..95316e8734fa 100644 --- a/python/ray/data/_internal/progress/logging_progress.py +++ b/python/ray/data/_internal/progress/logging_progress.py @@ -7,15 +7,11 @@ from ray._common.utils import env_integer from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer -from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin +from ray.data._internal.execution.operators.sub_progress import SubProgressMixin from ray.data._internal.execution.streaming_executor_state import ( format_op_state_summary, ) -from ray.data._internal.progress.base_progress import ( - BaseExecutionProgressManager, - BaseProgressBar, - NoopSubProgressBar, -) +from ray.data._internal.progress.base_progress import BaseExecutionProgressManager from ray.data._internal.progress.utils import truncate_operator_name if typing.TYPE_CHECKING: @@ -34,54 +30,6 @@ class _LoggingMetrics: total: Optional[int] -class LoggingSubProgressBar(BaseProgressBar): - """Thin wrapper to provide identical interface to the ProgressBar. - - Internally passes relevant logging metrics to `LoggingExecutionProgressManager`. - Sub-progress is actually handled by Ray through operators, while operator-level - and total progress is handled by the `StreamingExecutor`. To ensure log-order, - this class helps to pass metric data to the progress manager so progress metrics - are logged centrally. - """ - - def __init__( - self, - name: str, - total: Optional[int] = None, - max_name_length: int = 100, - ): - """Initialize sub-progress bar - - Args: - name: name of sub-progress bar - total: total number of output rows. None for unknown. - max_name_length: maximum operator name length (unused). - """ - del max_name_length # unused - self._total = total - self._completed = 0 - self._name = name - - def set_description(self, name: str) -> None: - pass # unused - - def get_description(self) -> str: - return "" # unused - - def update(self, increment: int = 0, total: Optional[int] = None): - if total is not None: - self._total = total - self._completed += increment - - def get_logging_metrics(self) -> _LoggingMetrics: - return _LoggingMetrics( - name=f" - {self._name}", - desc=None, - completed=self._completed, - total=self._total, - ) - - class LoggingExecutionProgressManager(BaseExecutionProgressManager): """Execution progress display for non-tty situations, preventing spamming of progress reporting.""" @@ -116,9 +64,7 @@ def __init__( name="Total Progress", desc=None, completed=0, total=None ) self._op_progress_metrics: Dict["OpState", _LoggingMetrics] = {} - self._sub_progress_metrics: Dict[ - "OpState", List[LoggingSubProgressBar] - ] = defaultdict(list) + self._sub_progress_names: Dict["OpState", List[str]] = defaultdict(list) for state in self._topology.values(): op = state.op @@ -126,7 +72,7 @@ def __init__( continue total = op.num_output_rows_total() or 1 - contains_sub_progress_bars = isinstance(op, SubProgressBarMixin) + contains_sub_progress_bars = isinstance(op, SubProgressMixin) sub_progress_bar_enabled = show_op_progress and ( contains_sub_progress_bars or verbose_progress ) @@ -142,20 +88,12 @@ def __init__( if not contains_sub_progress_bars: continue - sub_pg_names = op.get_sub_progress_bar_names() - if sub_pg_names is None: + sub_progress_metrics = op.get_sub_progress_metrics() + if sub_progress_metrics is None: continue - for name in sub_pg_names: + for name in sub_progress_metrics: if sub_progress_bar_enabled: - pg = LoggingSubProgressBar( - name=name, total=total, max_name_length=self.MAX_NAME_LENGTH - ) - self._sub_progress_metrics[state].append(pg) - else: - pg = NoopSubProgressBar( - name=name, max_name_length=self.MAX_NAME_LENGTH - ) - op.set_sub_progress_bar(name, pg) + self._sub_progress_names[state].append(name) # Management def start(self): @@ -185,8 +123,22 @@ def refresh(self): if metrics is None: continue _log_op_or_sub_progress(metrics) - for pg in self._sub_progress_metrics[opstate]: - _log_op_or_sub_progress(pg.get_logging_metrics()) + if isinstance(opstate.op, SubProgressMixin): + sub_progress_metrics = opstate.op.get_sub_progress_metrics() + if sub_progress_metrics is None: + continue + for name in self._sub_progress_names[opstate]: + metrics = sub_progress_metrics.get(name) + if metrics is None: + continue + _log_op_or_sub_progress( + _LoggingMetrics( + name=f" - {metrics.name}", + desc=None, + completed=metrics.completed, + total=metrics.total, + ) + ) # finish logging logger.info(lastline) diff --git a/python/ray/data/_internal/progress/rich_progress.py b/python/ray/data/_internal/progress/rich_progress.py index d66fac911ab5..cb547ccfb270 100644 --- a/python/ray/data/_internal/progress/rich_progress.py +++ b/python/ray/data/_internal/progress/rich_progress.py @@ -20,14 +20,14 @@ from rich.text import Text from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer -from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin +from ray.data._internal.execution.operators.sub_progress import SubProgressMixin from ray.data._internal.execution.streaming_executor_state import ( format_op_state_summary, ) from ray.data._internal.progress.base_progress import ( BaseExecutionProgressManager, BaseProgressBar, - NoopSubProgressBar, + make_sub_progress_sync_callback, ) from ray.data._internal.progress.utils import truncate_operator_name @@ -107,6 +107,12 @@ def update(self, increment: int = 0, total: Optional[int] = None) -> None: self._completed += increment self._update(self._completed, self._total) + def update_absolute(self, completed: int, total: Optional[int] = None) -> None: + if self._enabled: + self._completed = completed + self._total = total + self._update(self._completed, self._total) + def complete(self) -> None: if self._enabled: self._update(self._completed, self._completed) @@ -179,7 +185,7 @@ def _setup_operator_progress(self, topology: "Topology"): if isinstance(op, InputDataBuffer): continue - contains_sub_progress_bars = isinstance(op, SubProgressBarMixin) + contains_sub_progress_bars = isinstance(op, SubProgressMixin) sub_progress_bar_enabled = self._show_op_progress and ( contains_sub_progress_bars or self._verbose_progress ) @@ -203,37 +209,43 @@ def _setup_operator_progress(self, topology: "Topology"): if not contains_sub_progress_bars: continue - sub_progress_bar_names = op.get_sub_progress_bar_names() - if sub_progress_bar_names is None: + sub_progress_metrics = op.get_sub_progress_metrics() + if sub_progress_metrics is None: continue + sub_progress_updaters = op.get_sub_progress_updaters() - for name in sub_progress_bar_names: + for name, metrics in sub_progress_metrics.items(): if sub_progress_bar_enabled: progress = self._make_progress_bar( _TREE_VERTICAL_SUB_PROGRESS, "", 10 ) - total = state.op.num_output_rows_total() tid = progress.add_task( name, - total=total if total is not None else 1, + total=metrics.total if metrics.total is not None else 1, start=True, rate_str="? rows/s", count_str="0/?", ) rows.append(progress) - pg = RichSubProgressBar( + display_pg = RichSubProgressBar( name=name, - total=total, + total=metrics.total, progress=progress, tid=tid, max_name_length=self.MAX_NAME_LENGTH, ) else: - pg = NoopSubProgressBar( - name=name, max_name_length=self.MAX_NAME_LENGTH - ) - op.set_sub_progress_bar(name, pg) - self._sub_progress_bars.append(pg) + display_pg = None + if display_pg is not None: + display_pg.update_absolute(metrics.completed, metrics.total) + self._sub_progress_bars.append(display_pg) + if ( + sub_progress_updaters is not None + and name in sub_progress_updaters + ): + sub_progress_updaters[name].add_update_callback( + make_sub_progress_sync_callback(display_pg) + ) if rows: self._layout_table.add_row(Text(f" {_TREE_VERTICAL}", no_wrap=True)) for row in rows: diff --git a/python/ray/data/_internal/progress/tqdm_progress.py b/python/ray/data/_internal/progress/tqdm_progress.py index 42dc8f991393..fa1eab228271 100644 --- a/python/ray/data/_internal/progress/tqdm_progress.py +++ b/python/ray/data/_internal/progress/tqdm_progress.py @@ -3,14 +3,14 @@ from typing import Dict, List, Optional from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer -from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin +from ray.data._internal.execution.operators.sub_progress import SubProgressMixin from ray.data._internal.execution.streaming_executor_state import ( format_op_state_summary, ) from ray.data._internal.progress.base_progress import ( BaseExecutionProgressManager, BaseProgressBar, - NoopSubProgressBar, + make_sub_progress_sync_callback, ) from ray.data._internal.progress.progress_bar import ProgressBar @@ -81,7 +81,7 @@ def __init__( continue total = op.num_output_rows_total() or 1 - contains_sub_progress_bars = isinstance(op, SubProgressBarMixin) + contains_sub_progress_bars = isinstance(op, SubProgressMixin) sub_progress_bar_enabled = show_op_progress and ( contains_sub_progress_bars or verbose_progress ) @@ -102,14 +102,15 @@ def __init__( if not contains_sub_progress_bars: continue - sub_pg_names = op.get_sub_progress_bar_names() - if sub_pg_names is None: + sub_progress_metrics = op.get_sub_progress_metrics() + if sub_progress_metrics is None: continue - for name in sub_pg_names: + sub_progress_updaters = op.get_sub_progress_updaters() + for name, metrics in sub_progress_metrics.items(): if sub_progress_bar_enabled: - pg = TqdmSubProgressBar( + display_pg = TqdmSubProgressBar( name=f" *- {name}", - total=total, + total=metrics.total, unit="row", position=num_progress_bars, max_name_length=self.MAX_NAME_LENGTH, @@ -117,12 +118,17 @@ def __init__( ) num_progress_bars += 1 else: - pg = NoopSubProgressBar( - name=f" *- {name}", - max_name_length=self.MAX_NAME_LENGTH, - ) - op.set_sub_progress_bar(name, pg) - self._sub_progress_bars.append(pg) + display_pg = None + if display_pg is not None: + display_pg.update_absolute(metrics.completed, metrics.total) + self._sub_progress_bars.append(display_pg) + if ( + sub_progress_updaters is not None + and name in sub_progress_updaters + ): + sub_progress_updaters[name].add_update_callback( + make_sub_progress_sync_callback(display_pg) + ) # Management def start(self): diff --git a/python/ray/data/tests/test_gpu_shuffle.py b/python/ray/data/tests/test_gpu_shuffle.py index e5a56fda5e10..72331d0b76a9 100644 --- a/python/ray/data/tests/test_gpu_shuffle.py +++ b/python/ray/data/tests/test_gpu_shuffle.py @@ -319,20 +319,18 @@ def test_incremental_resource_usage_is_one_gpu(self): def test_progress_bar_names(self): op = self._make_op() - names = op.get_sub_progress_bar_names() + names = list(op.get_sub_progress_metrics().keys()) assert names == ["GPU Shuffle", "GPU Reduce"] - def test_set_sub_progress_bar_shuffle(self): + def test_sub_progress_metric_shuffle(self): op = self._make_op() - mock_bar = MagicMock() - op.set_sub_progress_bar("GPU Shuffle", mock_bar) - assert op._shuffle_bar is mock_bar + metric = op.get_sub_progress_metrics()["GPU Shuffle"] + assert metric is not None - def test_set_sub_progress_bar_reduce(self): + def test_sub_progress_metric_reduce(self): op = self._make_op() - mock_bar = MagicMock() - op.set_sub_progress_bar("GPU Reduce", mock_bar) - assert op._reduce_bar is mock_bar + metric = op.get_sub_progress_metrics()["GPU Reduce"] + assert metric is not None def test_initial_state(self): op = self._make_op() diff --git a/python/ray/data/tests/test_operators.py b/python/ray/data/tests/test_operators.py index 73fc2b6fb353..7b179def779b 100644 --- a/python/ray/data/tests/test_operators.py +++ b/python/ray/data/tests/test_operators.py @@ -15,7 +15,7 @@ from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.util import make_ref_bundles -from ray.data._internal.progress.base_progress import NoopSubProgressBar +from ray.data._internal.progress.base_progress import ProgressMetrics from ray.data.block import BlockAccessor from ray.data.context import DataContext from ray.data.tests.util import ( @@ -72,8 +72,8 @@ def test_input_data_buffer(ray_start_regular_shared): def test_all_to_all_operator(): def dummy_all_transform(bundles: List[RefBundle], ctx): - assert len(ctx.sub_progress_bar_dict) == 2 - assert list(ctx.sub_progress_bar_dict.keys()) == ["Test1", "Test2"] + assert len(ctx.sub_progress_metrics) == 2 + assert list(ctx.sub_progress_metrics.keys()) == ["Test1", "Test2"] return make_ref_bundles([[1, 2], [3, 4]]), {"FooStats": []} input_op = InputDataBuffer( @@ -89,13 +89,9 @@ def dummy_all_transform(bundles: List[RefBundle], ctx): name="TestAll", ) - # Initialize progress bar. - for name in op.get_sub_progress_bar_names(): - pg = NoopSubProgressBar( - name=name, - max_name_length=100, - ) - op.set_sub_progress_bar(name, pg) + metrics = op.get_sub_progress_metrics() + assert list(metrics.keys()) == ["Test1", "Test2"] + assert all(isinstance(metric, ProgressMetrics) for metric in metrics.values()) # Feed data. op.start(ExecutionOptions()) diff --git a/python/ray/data/tests/test_progress_bar.py b/python/ray/data/tests/test_progress_bar.py index e10ffd176791..e1a837a72e28 100644 --- a/python/ray/data/tests/test_progress_bar.py +++ b/python/ray/data/tests/test_progress_bar.py @@ -6,6 +6,10 @@ from pytest import fixture import ray +from ray.data._internal.progress.base_progress import ( + ProgressMetrics, + SubProgressUpdater, +) from ray.data._internal.progress.progress_bar import ProgressBar @@ -83,6 +87,25 @@ def wrapped_close(): pb.close() +def test_sub_progress_updater_updates_metrics_and_notifies_callback(): + metrics_by_name = { + "Shuffle": ProgressMetrics(name="Shuffle", total=None, completed=0) + } + updater = SubProgressUpdater(metrics_by_name, "Shuffle", max_name_length=100) + snapshots = [] + + updater.add_update_callback(snapshots.append) + updater.update(increment=3, total=10) + + assert metrics_by_name["Shuffle"] == ProgressMetrics( + name="Shuffle", total=10, completed=3 + ) + assert snapshots == [ + ProgressMetrics(name="Shuffle", total=None, completed=0), + ProgressMetrics(name="Shuffle", total=10, completed=3), + ] + + @pytest.mark.parametrize( "name, expected_description, max_line_length, should_emit_warning", [