From 83815696e082ff2f46675e895580c16758f84924 Mon Sep 17 00:00:00 2001 From: 400Ping Date: Wed, 1 Apr 2026 22:59:10 +0800 Subject: [PATCH 1/6] [Data] Decouple sub-progress metrics from progress bar implementations Signed-off-by: 400Ping --- .../execution/interfaces/task_context.py | 30 +++++++- .../operators/base_physical_operator.py | 50 ++++++++----- .../execution/operators/hash_shuffle.py | 55 +++++++++----- .../execution/operators/sub_progress.py | 40 +++++----- .../_internal/gpu_shuffle/hash_shuffle.py | 52 +++++++------ .../ray/data/_internal/planner/aggregate.py | 2 +- .../pull_based_shuffle_task_scheduler.py | 10 +-- .../push_based_shuffle_task_scheduler.py | 10 +-- .../split_repartition_task_scheduler.py | 6 +- python/ray/data/_internal/planner/sort.py | 2 +- .../data/_internal/progress/base_progress.py | 73 ++++++++++++++----- .../_internal/progress/logging_progress.py | 23 +++--- .../data/_internal/progress/rich_progress.py | 23 +++--- .../data/_internal/progress/tqdm_progress.py | 24 +++--- python/ray/data/tests/test_gpu_shuffle.py | 16 ++-- python/ray/data/tests/test_operators.py | 16 ++-- 16 files changed, 264 insertions(+), 168 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/task_context.py b/python/ray/data/_internal/execution/interfaces/task_context.py index 0b109992812e..c1383b291bc2 100644 --- a/python/ray/data/_internal/execution/interfaces/task_context.py +++ b/python/ray/data/_internal/execution/interfaces/task_context.py @@ -5,7 +5,10 @@ if TYPE_CHECKING: from ray.data._internal.execution.operators.map_transformer import MapTransformer - from ray.data._internal.progress.base_progress import BaseProgressBar + from ray.data._internal.progress.base_progress import ( + ProgressMetrics, + SubProgressUpdater, + ) _thread_local = threading.local() @@ -22,10 +25,13 @@ class TaskContext: # Name of the operator that this task belongs to. op_name: str - # The dictionary of sub progress bar to update. The key is name of sub progress - # bar. Note this is only used on driver side. + # Immutable sub-progress snapshots keyed by sub-progress name. + # Note this is only used on the driver side. # TODO(chengsu): clean it up from TaskContext with new optimizer framework. - sub_progress_bar_dict: Optional[Dict[str, "BaseProgressBar"]] = None + sub_progress_metrics: Optional[Dict[str, "ProgressMetrics"]] = None + + # Driver-side helpers for mutating the immutable sub-progress snapshots. + sub_progress_updaters: Optional[Dict[str, "SubProgressUpdater"]] = None # NOTE(hchen): `upstream_map_transformer` and `upstream_map_ray_remote_args` # are only used for `RandomShuffle`. DO NOT use them for other operators. @@ -86,3 +92,19 @@ def current(cls, context: "TaskContext") -> Iterator["TaskContext"]: yield context finally: cls.reset_current() + + @property + def sub_progress_bar_dict(self): + return self.sub_progress_updaters + + @sub_progress_bar_dict.setter + def sub_progress_bar_dict(self, value): + self.sub_progress_updaters = value + + @property + def sub_progress_tracker_dict(self): + return self.sub_progress_updaters + + @sub_progress_tracker_dict.setter + def sub_progress_tracker_dict(self, value): + self.sub_progress_updaters = value diff --git a/python/ray/data/_internal/execution/operators/base_physical_operator.py b/python/ray/data/_internal/execution/operators/base_physical_operator.py index e34085cfd02c..f31d78fa0daf 100644 --- a/python/ray/data/_internal/execution/operators/base_physical_operator.py +++ b/python/ray/data/_internal/execution/operators/base_physical_operator.py @@ -1,5 +1,4 @@ import abc -import typing from typing import List, Optional from typing_extensions import override @@ -11,13 +10,14 @@ RefBundle, TaskContext, ) -from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin +from ray.data._internal.execution.operators.sub_progress import SubProgressMixin +from ray.data._internal.progress.base_progress import ( + ProgressMetrics, + SubProgressUpdater, +) from ray.data._internal.stats import StatsDict from ray.data.context import DataContext -if typing.TYPE_CHECKING: - from ray.data._internal.progress.base_progress import BaseProgressBar - class InternalQueueOperatorMixin(PhysicalOperator, abc.ABC): @property @@ -100,9 +100,7 @@ def input_dependency(self) -> PhysicalOperator: return self.input_dependencies[0] -class AllToAllOperator( - InternalQueueOperatorMixin, SubProgressBarMixin, PhysicalOperator -): +class AllToAllOperator(InternalQueueOperatorMixin, SubProgressMixin, PhysicalOperator): """A blocking operator that executes once its inputs are complete. This operator implements distributed sort / shuffle operations, etc. @@ -135,8 +133,27 @@ def __init__( self._next_task_index = 0 self._num_outputs = num_outputs self._output_rows = 0 - self._sub_progress_bar_names = sub_progress_bar_names - self._sub_progress_bar_dict = None + self._sub_progress_names = sub_progress_bar_names + self._sub_progress_metrics = ( + { + name: ProgressMetrics(name=name, total=None, completed=0) + for name in sub_progress_bar_names + } + if sub_progress_bar_names is not None + else None + ) + self._sub_progress_updaters = ( + { + name: SubProgressUpdater( + self._sub_progress_metrics, + name=name, + max_name_length=100, + ) + for name in sub_progress_bar_names + } + if sub_progress_bar_names is not None + else None + ) self._input_buffer: FIFOBundleQueue = FIFOBundleQueue() self._output_buffer: FIFOBundleQueue = FIFOBundleQueue() self._stats: StatsDict = {} @@ -176,7 +193,8 @@ def all_inputs_done(self) -> None: ctx = TaskContext( task_idx=self._next_task_index, op_name=self.name, - sub_progress_bar_dict=self._sub_progress_bar_dict, + sub_progress_metrics=self._sub_progress_metrics, + sub_progress_updaters=self._sub_progress_updaters, target_max_block_size_override=self.target_max_block_size_override, ) # NOTE: We don't account object store memory use from intermediate `bulk_fn` @@ -213,13 +231,11 @@ def get_transformation_fn(self) -> AllToAllTransformFn: def progress_str(self) -> str: return f"{self.num_output_rows_total() or 0} rows output" - def get_sub_progress_bar_names(self) -> Optional[List[str]]: - return self._sub_progress_bar_names + def get_sub_progress_metrics(self) -> Optional[dict[str, "ProgressMetrics"]]: + return self._sub_progress_metrics - def set_sub_progress_bar(self, name: str, pg: "BaseProgressBar"): - if self._sub_progress_bar_dict is None: - self._sub_progress_bar_dict = {} - self._sub_progress_bar_dict[name] = pg + def get_sub_progress_updaters(self): + return self._sub_progress_updaters def supports_fusion(self): return True diff --git a/python/ray/data/_internal/execution/operators/hash_shuffle.py b/python/ray/data/_internal/execution/operators/hash_shuffle.py index 0c16b7151e5a..0e6a956a7db7 100644 --- a/python/ray/data/_internal/execution/operators/hash_shuffle.py +++ b/python/ray/data/_internal/execution/operators/hash_shuffle.py @@ -8,7 +8,6 @@ import random import threading import time -import typing from collections import defaultdict, deque from dataclasses import dataclass from typing import ( @@ -54,9 +53,13 @@ TaskExecDriverStats, estimate_total_num_of_blocks, ) -from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin +from ray.data._internal.execution.operators.sub_progress import SubProgressMixin from ray.data._internal.logical.interfaces import LogicalOperator from ray.data._internal.output_buffer import BlockOutputBuffer, OutputBlockSizeOption +from ray.data._internal.progress.base_progress import ( + ProgressMetrics, + SubProgressUpdater, +) from ray.data._internal.stats import OpRuntimeMetrics from ray.data._internal.table_block import TableBlockAccessor from ray.data._internal.util import GiB, MiB @@ -77,9 +80,6 @@ DataContext, ) -if typing.TYPE_CHECKING: - from ray.data._internal.progress.base_progress import BaseProgressBar - logger = logging.getLogger(__name__) @@ -468,7 +468,7 @@ def _derive_max_shuffle_aggregators( ) -class HashShufflingOperatorBase(PhysicalOperator, SubProgressBarMixin): +class HashShufflingOperatorBase(PhysicalOperator, SubProgressMixin): """Physical operator base-class for any operators requiring hash-based shuffling. @@ -647,10 +647,28 @@ def __init__( self._health_monitoring_start_time: float = 0.0 self._pending_aggregators_refs: Optional[List[ObjectRef[ActorHandle]]] = None - # sub-progress bar initializations - self._shuffle_bar = None + # Driver-side sub-progress metrics for manager-owned display state. + self._sub_progress_metrics = { + self.shuffle_name: ProgressMetrics( + name=self.shuffle_name, total=None, completed=0 + ), + self.reduce_name: ProgressMetrics( + name=self.reduce_name, total=None, completed=0 + ), + } + self._sub_progress_updaters = { + self.shuffle_name: SubProgressUpdater( + self._sub_progress_metrics, + name=self.shuffle_name, + max_name_length=100, + ), + self.reduce_name: SubProgressUpdater( + self._sub_progress_metrics, + name=self.reduce_name, + max_name_length=100, + ), + } self._shuffle_metrics = OpRuntimeMetrics(self) - self._reduce_bar = None self._reduce_metrics = OpRuntimeMetrics(self) def start(self, options: ExecutionOptions) -> None: @@ -771,7 +789,9 @@ def _on_partitioning_done(cur_shuffle_task_idx: int): ) # Update Shuffle progress bar - self._shuffle_bar.update(increment=input_block_metadata.num_rows or 0) + self._sub_progress_updaters[self.shuffle_name].update( + increment=input_block_metadata.num_rows or 0 + ) # TODO update metrics task = self._shuffling_tasks[input_index][ @@ -807,7 +827,7 @@ def _on_partitioning_done(cur_shuffle_task_idx: int): self._shuffle_metrics, total_num_tasks=None, ) - self._shuffle_bar.update(total=num_rows) + self._sub_progress_updaters[self.shuffle_name].update(total=num_rows) def has_next(self) -> bool: self._try_finalize() @@ -881,7 +901,7 @@ def _on_bundle_ready(partition_id: int, bundle: RefBundle): self._estimated_output_num_rows = num_rows # Update Finalize progress bar - self._reduce_bar.update( + self._sub_progress_updaters[self.reduce_name].update( increment=bundle.num_rows() or 0, total=self.num_output_rows_total() ) @@ -1280,14 +1300,11 @@ def _get_min_max_partition_shards_compaction_thresholds( ) -> Optional[Tuple[int, int]]: return None - def get_sub_progress_bar_names(self) -> Optional[List[str]]: - return [self.shuffle_name, self.reduce_name] + def get_sub_progress_metrics(self) -> Optional[Dict[str, ProgressMetrics]]: + return self._sub_progress_metrics - def set_sub_progress_bar(self, name: str, pg: "BaseProgressBar"): - if self.shuffle_name == name: - self._shuffle_bar = pg - elif self.reduce_name == name: - self._reduce_bar = pg + def get_sub_progress_updaters(self): + return self._sub_progress_updaters class HashShuffleOperator(HashShufflingOperatorBase): diff --git a/python/ray/data/_internal/execution/operators/sub_progress.py b/python/ray/data/_internal/execution/operators/sub_progress.py index 578407357718..b1148c2525a3 100644 --- a/python/ray/data/_internal/execution/operators/sub_progress.py +++ b/python/ray/data/_internal/execution/operators/sub_progress.py @@ -1,31 +1,35 @@ import typing from abc import ABC, abstractmethod -from typing import List, Optional +from typing import Dict, List, Optional if typing.TYPE_CHECKING: - from ray.data._internal.progress.base_progress import BaseProgressBar + from ray.data._internal.progress.base_progress import ( + ProgressMetrics, + SubProgressUpdater, + ) -class SubProgressBarMixin(ABC): - """Abstract class for operators that support sub-progress bars""" +class SubProgressMixin(ABC): + """Abstract class for operators that support driver-side sub-progress tracking.""" @abstractmethod - def get_sub_progress_bar_names(self) -> Optional[List[str]]: + def get_sub_progress_metrics(self) -> Optional[Dict[str, "ProgressMetrics"]]: """ - Returns list of sub-progress bar names - - This is used to create the sub-progress bars in the progress manager. - Note that sub-progress bars will be created in the order returned by - this method. + Returns sub-progress metrics keyed by sub-progress name. """ ... - @abstractmethod - def set_sub_progress_bar(self, name: str, pg: "BaseProgressBar"): - """ - Sets sub-progress bars + def get_sub_progress_updaters(self) -> Optional[Dict[str, "SubProgressUpdater"]]: + """Returns driver-side helpers for mutating sub-progress metrics.""" + return None - name: name of sub-progress bar - pg: a progress bar. Can be sub-progress bars for rich, tqdm, etc. - """ - ... + # Backward-compatible wrappers during the transition away from "bar" naming. + def get_sub_progress_bar_names(self) -> Optional[List[str]]: + metrics = self.get_sub_progress_metrics() + return list(metrics.keys()) if metrics is not None else None + + def get_sub_progress_names(self) -> Optional[List[str]]: + return self.get_sub_progress_bar_names() + + +SubProgressBarMixin = SubProgressMixin diff --git a/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py b/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py index 0c642ac5f8ff..e19b521b7441 100644 --- a/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py +++ b/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py @@ -2,7 +2,6 @@ import logging import pickle import time -import typing from typing import ( Dict, Iterator, @@ -31,15 +30,15 @@ from ray.data._internal.execution.operators.hash_shuffle import ( _get_total_cluster_resources, ) -from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin +from ray.data._internal.execution.operators.sub_progress import SubProgressMixin +from ray.data._internal.progress.base_progress import ( + ProgressMetrics, + SubProgressUpdater, +) from ray.data._internal.stats import OpRuntimeMetrics from ray.data.block import Block, BlockAccessor, BlockStats, to_stats from ray.data.context import DataContext -if typing.TYPE_CHECKING: - - from ray.data._internal.progress.base_progress import BaseProgressBar - logger = logging.getLogger(__name__) # Arrow schema metadata key for the rapidsmpf partition ID. @@ -398,7 +397,7 @@ def _derive_num_gpu_ranks(data_context: DataContext) -> int: # --------------------------------------------------------------------------- -class GPUShuffleOperator(PhysicalOperator, SubProgressBarMixin): +class GPUShuffleOperator(PhysicalOperator, SubProgressMixin): """GPU-native shuffle operator using RAPIDS MPF + UCXX. Unlike the CPU ``HashShuffleOperator``, this operator: @@ -469,9 +468,22 @@ def __init__( self._shuffled_blocks_stats: List[BlockStats] = [] self._output_blocks_stats: List[BlockStats] = [] - # Progress bars (populated by SubProgressBarMixin callbacks) - self._shuffle_bar = None - self._reduce_bar = None + self._sub_progress_metrics = { + "GPU Shuffle": ProgressMetrics(name="GPU Shuffle", total=None, completed=0), + "GPU Reduce": ProgressMetrics(name="GPU Reduce", total=None, completed=0), + } + self._sub_progress_updaters = { + "GPU Shuffle": SubProgressUpdater( + self._sub_progress_metrics, + name="GPU Shuffle", + max_name_length=100, + ), + "GPU Reduce": SubProgressUpdater( + self._sub_progress_metrics, + name="GPU Reduce", + max_name_length=100, + ), + } # Metrics self._shuffle_metrics = OpRuntimeMetrics(self) @@ -511,8 +523,9 @@ def _on_insert_done(idx: int = task_idx) -> None: task_id=task.get_task_id(), ) - if self._shuffle_bar is not None: - self._shuffle_bar.update(total=self._next_block_idx) + self._sub_progress_updaters["GPU Shuffle"].update( + total=self._next_block_idx + ) def _is_inserting_done(self) -> bool: return self._inputs_complete and len(self._insert_tasks) == 0 @@ -589,7 +602,7 @@ def _on_bundle_ready(bundle: RefBundle) -> None: self._estimated_output_num_rows = num_rows # Update Finalize progress bar - self._reduce_bar.update( + self._sub_progress_updaters["GPU Reduce"].update( increment=bundle.num_rows() or 0, total=self.num_output_rows_total() ) @@ -693,17 +706,14 @@ def get_actor_info(self): ) # ------------------------------------------------------------------ - # SubProgressBarMixin + # SubProgressMixin # ------------------------------------------------------------------ - def get_sub_progress_bar_names(self) -> List[str]: - return ["GPU Shuffle", "GPU Reduce"] + def get_sub_progress_metrics(self) -> Dict[str, ProgressMetrics]: + return self._sub_progress_metrics - def set_sub_progress_bar(self, name: str, pg: "BaseProgressBar") -> None: - if name == "GPU Shuffle": - self._shuffle_bar = pg - elif name == "GPU Reduce": - self._reduce_bar = pg + def get_sub_progress_updaters(self): + return self._sub_progress_updaters # ------------------------------------------------------------------ # Stats diff --git a/python/ray/data/_internal/planner/aggregate.py b/python/ray/data/_internal/planner/aggregate.py index 199382e226f0..f9631801448f 100644 --- a/python/ray/data/_internal/planner/aggregate.py +++ b/python/ray/data/_internal/planner/aggregate.py @@ -67,7 +67,7 @@ def fn( else: # Use same number of output partitions. num_outputs = num_mappers - sample_bar = ctx.sub_progress_bar_dict[ + sample_bar = ctx.sub_progress_updaters[ SortTaskSpec.SORT_SAMPLE_SUB_PROGRESS_BAR_NAME ] # Sample boundaries for aggregate key. diff --git a/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py b/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py index 262382b2a08e..91881cc4d1cb 100644 --- a/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py +++ b/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py @@ -76,10 +76,10 @@ def execute( shuffle_map = cached_remote_fn(self._exchange_spec.map) shuffle_reduce = cached_remote_fn(self._exchange_spec.reduce) - sub_progress_bar_dict = task_ctx.sub_progress_bar_dict + sub_progress_updaters = task_ctx.sub_progress_updaters bar_name = ExchangeTaskSpec.MAP_SUB_PROGRESS_BAR_NAME - assert bar_name in sub_progress_bar_dict, sub_progress_bar_dict - map_bar = sub_progress_bar_dict[bar_name] + assert bar_name in sub_progress_updaters, sub_progress_updaters + map_bar = sub_progress_updaters[bar_name] if _debug_limit_execution_to_num_blocks is not None: input_blocks_list = input_blocks_list[:_debug_limit_execution_to_num_blocks] @@ -111,8 +111,8 @@ def execute( self.warn_on_high_local_memory_store_usage() bar_name = ExchangeTaskSpec.REDUCE_SUB_PROGRESS_BAR_NAME - assert bar_name in sub_progress_bar_dict, sub_progress_bar_dict - reduce_bar = sub_progress_bar_dict[bar_name] + assert bar_name in sub_progress_updaters, sub_progress_updaters + reduce_bar = sub_progress_updaters[bar_name] if _debug_limit_execution_to_num_blocks is not None: output_num_blocks = _debug_limit_execution_to_num_blocks diff --git a/python/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py b/python/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py index af0c26f96caa..c059e9cacc49 100644 --- a/python/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py +++ b/python/ray/data/_internal/planner/exchange/push_based_shuffle_task_scheduler.py @@ -534,10 +534,10 @@ def merge(*args, **kwargs): [output_num_blocks, stage.merge_schedule, *self._exchange_spec._map_args], ) - sub_progress_bar_dict = task_ctx.sub_progress_bar_dict + sub_progress_updaters = task_ctx.sub_progress_updaters bar_name = ExchangeTaskSpec.MAP_SUB_PROGRESS_BAR_NAME - assert bar_name in sub_progress_bar_dict, sub_progress_bar_dict - map_bar = sub_progress_bar_dict[bar_name] + assert bar_name in sub_progress_updaters, sub_progress_updaters + map_bar = sub_progress_updaters[bar_name] map_stage_executor = _PipelinedStageExecutor( map_stage_iter, stage.num_map_tasks_per_round, progress_bar=map_bar ) @@ -586,8 +586,8 @@ def merge(*args, **kwargs): # Execute and wait for the reduce stage. bar_name = ExchangeTaskSpec.REDUCE_SUB_PROGRESS_BAR_NAME - assert bar_name in sub_progress_bar_dict, sub_progress_bar_dict - reduce_bar = sub_progress_bar_dict[bar_name] + assert bar_name in sub_progress_updaters, sub_progress_updaters + reduce_bar = sub_progress_updaters[bar_name] shuffle_reduce = cached_remote_fn(self._exchange_spec.reduce) reduce_stage_iter = _ReduceStageIterator( diff --git a/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py b/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py index 1c3a563ef694..5d42f4d96aed 100644 --- a/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py +++ b/python/ray/data/_internal/planner/exchange/split_repartition_task_scheduler.py @@ -82,10 +82,10 @@ def execute( split_block_refs.append(b) split_metadata.extend(m) - sub_progress_bar_dict = ctx.sub_progress_bar_dict + sub_progress_updaters = ctx.sub_progress_updaters bar_name = ShuffleTaskSpec.SPLIT_REPARTITION_SUB_PROGRESS_BAR_NAME - assert bar_name in sub_progress_bar_dict, sub_progress_bar_dict - reduce_bar = sub_progress_bar_dict[bar_name] + assert bar_name in sub_progress_updaters, sub_progress_updaters + reduce_bar = sub_progress_updaters[bar_name] reduce_task = cached_remote_fn(self._exchange_spec.reduce) reduce_return = [ diff --git a/python/ray/data/_internal/planner/sort.py b/python/ray/data/_internal/planner/sort.py index 852154c66c36..3c614bb821da 100644 --- a/python/ray/data/_internal/planner/sort.py +++ b/python/ray/data/_internal/planner/sort.py @@ -45,7 +45,7 @@ def fn( # Sample boundaries for sort key. if not sort_key.boundaries: - sample_bar = ctx.sub_progress_bar_dict[ + sample_bar = ctx.sub_progress_updaters[ SortTaskSpec.SORT_SAMPLE_SUB_PROGRESS_BAR_NAME ] boundaries = SortTaskSpec.sample_boundaries( diff --git a/python/ray/data/_internal/progress/base_progress.py b/python/ray/data/_internal/progress/base_progress.py index 17c83be7f5f3..f9824bf4d7ba 100644 --- a/python/ray/data/_internal/progress/base_progress.py +++ b/python/ray/data/_internal/progress/base_progress.py @@ -2,10 +2,11 @@ import threading import typing from abc import ABC, abstractmethod -from typing import Any, List, Optional +from dataclasses import dataclass +from typing import Any, Dict, List, Optional import ray -from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin +from ray.data._internal.execution.operators.sub_progress import SubProgressMixin from ray.data._internal.progress.utils import truncate_operator_name if typing.TYPE_CHECKING: @@ -127,8 +128,8 @@ def __init__( verbose_progress: bool, ): """Initialize the progress manager, create all necessary progress bars - and sub-progress bars for the given topology. Sub-progress bars are - created for operators that implement the SubProgressBarMixin. + and sub-progress trackers for the given topology. Concrete progress bars + are created only for operators that implement the SubProgressMixin. Args: dataset_id: id of Dataset @@ -192,27 +193,65 @@ def update_operator_progress( ... -class NoopSubProgressBar(BaseProgressBar): - """Sub-Progress Bar for Noop (Disabled) Progress Manager""" +@dataclass(frozen=True) +class ProgressMetrics: + name: str + total: Optional[int] + completed: int - def __init__(self, name: str, max_name_length: int): + +class SubProgressUpdater(BaseProgressBar): + """Driver-side helper that updates immutable sub-progress metrics.""" + + def __init__( + self, + metrics_by_name: Dict[str, ProgressMetrics], + name: str, + max_name_length: int, + display_bar: Optional[BaseProgressBar] = None, + ): + self._metrics_by_name = metrics_by_name + self._name = name self._max_name_length = max_name_length + self._display_bar = display_bar self._desc = truncate_operator_name(name, self._max_name_length) + def set_display_bar(self, display_bar: Optional[BaseProgressBar]) -> None: + self._display_bar = display_bar + metrics = self._metrics_by_name[self._name] + if self._display_bar is not None: + self._display_bar.set_description(self._desc) + self._display_bar.update(total=metrics.total) + if metrics.completed: + self._display_bar.update(metrics.completed) + def set_description(self, name: str) -> None: self._desc = truncate_operator_name(name, self._max_name_length) + if self._display_bar is not None: + self._display_bar.set_description(self._desc) def get_description(self) -> str: return self._desc def update(self, increment: int = 0, total: Optional[int] = None) -> None: - pass + metrics = self._metrics_by_name[self._name] + new_total = total if total is not None else metrics.total + new_completed = metrics.completed + increment + self._metrics_by_name[self._name] = ProgressMetrics( + name=metrics.name, + total=new_total, + completed=new_completed, + ) + if self._display_bar is not None: + self._display_bar.update(increment, total) def refresh(self): - pass + if self._display_bar is not None: + self._display_bar.refresh() def close(self): - pass + if self._display_bar is not None: + self._display_bar.close() class NoopExecutionProgressManager(BaseExecutionProgressManager): @@ -227,15 +266,13 @@ def __init__( ): for state in topology.values(): op = state.op - if not isinstance(op, SubProgressBarMixin): + if not isinstance(op, SubProgressMixin): + continue + updaters = op.get_sub_progress_updaters() + if updaters is None: continue - sub_pg_names = op.get_sub_progress_bar_names() - if sub_pg_names is not None: - for name in sub_pg_names: - pg = NoopSubProgressBar( - name=name, max_name_length=self.MAX_NAME_LENGTH - ) - op.set_sub_progress_bar(name, pg) + for updater in updaters.values(): + updater.set_display_bar(None) def start(self) -> None: pass diff --git a/python/ray/data/_internal/progress/logging_progress.py b/python/ray/data/_internal/progress/logging_progress.py index 6530c7202315..72937ef870fd 100644 --- a/python/ray/data/_internal/progress/logging_progress.py +++ b/python/ray/data/_internal/progress/logging_progress.py @@ -7,14 +7,13 @@ from ray._common.utils import env_integer from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer -from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin +from ray.data._internal.execution.operators.sub_progress import SubProgressMixin from ray.data._internal.execution.streaming_executor_state import ( format_op_state_summary, ) from ray.data._internal.progress.base_progress import ( BaseExecutionProgressManager, BaseProgressBar, - NoopSubProgressBar, ) from ray.data._internal.progress.utils import truncate_operator_name @@ -126,7 +125,7 @@ def __init__( continue total = op.num_output_rows_total() or 1 - contains_sub_progress_bars = isinstance(op, SubProgressBarMixin) + contains_sub_progress_bars = isinstance(op, SubProgressMixin) sub_progress_bar_enabled = show_op_progress and ( contains_sub_progress_bars or verbose_progress ) @@ -142,20 +141,20 @@ def __init__( if not contains_sub_progress_bars: continue - sub_pg_names = op.get_sub_progress_bar_names() - if sub_pg_names is None: + sub_progress_metrics = op.get_sub_progress_metrics() + sub_progress_updaters = op.get_sub_progress_updaters() + if sub_progress_metrics is None or sub_progress_updaters is None: continue - for name in sub_pg_names: + for name, metrics in sub_progress_metrics.items(): if sub_progress_bar_enabled: - pg = LoggingSubProgressBar( + display_pg = LoggingSubProgressBar( name=name, total=total, max_name_length=self.MAX_NAME_LENGTH ) - self._sub_progress_metrics[state].append(pg) else: - pg = NoopSubProgressBar( - name=name, max_name_length=self.MAX_NAME_LENGTH - ) - op.set_sub_progress_bar(name, pg) + display_pg = None + if display_pg is not None: + self._sub_progress_metrics[state].append(display_pg) + sub_progress_updaters[name].set_display_bar(display_pg) # Management def start(self): diff --git a/python/ray/data/_internal/progress/rich_progress.py b/python/ray/data/_internal/progress/rich_progress.py index d66fac911ab5..e6f4eb61d8f1 100644 --- a/python/ray/data/_internal/progress/rich_progress.py +++ b/python/ray/data/_internal/progress/rich_progress.py @@ -20,14 +20,13 @@ from rich.text import Text from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer -from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin +from ray.data._internal.execution.operators.sub_progress import SubProgressMixin from ray.data._internal.execution.streaming_executor_state import ( format_op_state_summary, ) from ray.data._internal.progress.base_progress import ( BaseExecutionProgressManager, BaseProgressBar, - NoopSubProgressBar, ) from ray.data._internal.progress.utils import truncate_operator_name @@ -179,7 +178,7 @@ def _setup_operator_progress(self, topology: "Topology"): if isinstance(op, InputDataBuffer): continue - contains_sub_progress_bars = isinstance(op, SubProgressBarMixin) + contains_sub_progress_bars = isinstance(op, SubProgressMixin) sub_progress_bar_enabled = self._show_op_progress and ( contains_sub_progress_bars or self._verbose_progress ) @@ -203,11 +202,12 @@ def _setup_operator_progress(self, topology: "Topology"): if not contains_sub_progress_bars: continue - sub_progress_bar_names = op.get_sub_progress_bar_names() - if sub_progress_bar_names is None: + sub_progress_metrics = op.get_sub_progress_metrics() + sub_progress_updaters = op.get_sub_progress_updaters() + if sub_progress_metrics is None or sub_progress_updaters is None: continue - for name in sub_progress_bar_names: + for name, metrics in sub_progress_metrics.items(): if sub_progress_bar_enabled: progress = self._make_progress_bar( _TREE_VERTICAL_SUB_PROGRESS, "", 10 @@ -221,7 +221,7 @@ def _setup_operator_progress(self, topology: "Topology"): count_str="0/?", ) rows.append(progress) - pg = RichSubProgressBar( + display_pg = RichSubProgressBar( name=name, total=total, progress=progress, @@ -229,11 +229,10 @@ def _setup_operator_progress(self, topology: "Topology"): max_name_length=self.MAX_NAME_LENGTH, ) else: - pg = NoopSubProgressBar( - name=name, max_name_length=self.MAX_NAME_LENGTH - ) - op.set_sub_progress_bar(name, pg) - self._sub_progress_bars.append(pg) + display_pg = None + sub_progress_updaters[name].set_display_bar(display_pg) + if display_pg is not None: + self._sub_progress_bars.append(display_pg) if rows: self._layout_table.add_row(Text(f" {_TREE_VERTICAL}", no_wrap=True)) for row in rows: diff --git a/python/ray/data/_internal/progress/tqdm_progress.py b/python/ray/data/_internal/progress/tqdm_progress.py index 42dc8f991393..fff443998b3d 100644 --- a/python/ray/data/_internal/progress/tqdm_progress.py +++ b/python/ray/data/_internal/progress/tqdm_progress.py @@ -3,14 +3,13 @@ from typing import Dict, List, Optional from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer -from ray.data._internal.execution.operators.sub_progress import SubProgressBarMixin +from ray.data._internal.execution.operators.sub_progress import SubProgressMixin from ray.data._internal.execution.streaming_executor_state import ( format_op_state_summary, ) from ray.data._internal.progress.base_progress import ( BaseExecutionProgressManager, BaseProgressBar, - NoopSubProgressBar, ) from ray.data._internal.progress.progress_bar import ProgressBar @@ -81,7 +80,7 @@ def __init__( continue total = op.num_output_rows_total() or 1 - contains_sub_progress_bars = isinstance(op, SubProgressBarMixin) + contains_sub_progress_bars = isinstance(op, SubProgressMixin) sub_progress_bar_enabled = show_op_progress and ( contains_sub_progress_bars or verbose_progress ) @@ -102,12 +101,13 @@ def __init__( if not contains_sub_progress_bars: continue - sub_pg_names = op.get_sub_progress_bar_names() - if sub_pg_names is None: + sub_progress_metrics = op.get_sub_progress_metrics() + sub_progress_updaters = op.get_sub_progress_updaters() + if sub_progress_metrics is None or sub_progress_updaters is None: continue - for name in sub_pg_names: + for name, metrics in sub_progress_metrics.items(): if sub_progress_bar_enabled: - pg = TqdmSubProgressBar( + display_pg = TqdmSubProgressBar( name=f" *- {name}", total=total, unit="row", @@ -117,12 +117,10 @@ def __init__( ) num_progress_bars += 1 else: - pg = NoopSubProgressBar( - name=f" *- {name}", - max_name_length=self.MAX_NAME_LENGTH, - ) - op.set_sub_progress_bar(name, pg) - self._sub_progress_bars.append(pg) + display_pg = None + sub_progress_updaters[name].set_display_bar(display_pg) + if display_pg is not None: + self._sub_progress_bars.append(display_pg) # Management def start(self): diff --git a/python/ray/data/tests/test_gpu_shuffle.py b/python/ray/data/tests/test_gpu_shuffle.py index e5a56fda5e10..72331d0b76a9 100644 --- a/python/ray/data/tests/test_gpu_shuffle.py +++ b/python/ray/data/tests/test_gpu_shuffle.py @@ -319,20 +319,18 @@ def test_incremental_resource_usage_is_one_gpu(self): def test_progress_bar_names(self): op = self._make_op() - names = op.get_sub_progress_bar_names() + names = list(op.get_sub_progress_metrics().keys()) assert names == ["GPU Shuffle", "GPU Reduce"] - def test_set_sub_progress_bar_shuffle(self): + def test_sub_progress_metric_shuffle(self): op = self._make_op() - mock_bar = MagicMock() - op.set_sub_progress_bar("GPU Shuffle", mock_bar) - assert op._shuffle_bar is mock_bar + metric = op.get_sub_progress_metrics()["GPU Shuffle"] + assert metric is not None - def test_set_sub_progress_bar_reduce(self): + def test_sub_progress_metric_reduce(self): op = self._make_op() - mock_bar = MagicMock() - op.set_sub_progress_bar("GPU Reduce", mock_bar) - assert op._reduce_bar is mock_bar + metric = op.get_sub_progress_metrics()["GPU Reduce"] + assert metric is not None def test_initial_state(self): op = self._make_op() diff --git a/python/ray/data/tests/test_operators.py b/python/ray/data/tests/test_operators.py index 73fc2b6fb353..7b179def779b 100644 --- a/python/ray/data/tests/test_operators.py +++ b/python/ray/data/tests/test_operators.py @@ -15,7 +15,7 @@ from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.util import make_ref_bundles -from ray.data._internal.progress.base_progress import NoopSubProgressBar +from ray.data._internal.progress.base_progress import ProgressMetrics from ray.data.block import BlockAccessor from ray.data.context import DataContext from ray.data.tests.util import ( @@ -72,8 +72,8 @@ def test_input_data_buffer(ray_start_regular_shared): def test_all_to_all_operator(): def dummy_all_transform(bundles: List[RefBundle], ctx): - assert len(ctx.sub_progress_bar_dict) == 2 - assert list(ctx.sub_progress_bar_dict.keys()) == ["Test1", "Test2"] + assert len(ctx.sub_progress_metrics) == 2 + assert list(ctx.sub_progress_metrics.keys()) == ["Test1", "Test2"] return make_ref_bundles([[1, 2], [3, 4]]), {"FooStats": []} input_op = InputDataBuffer( @@ -89,13 +89,9 @@ def dummy_all_transform(bundles: List[RefBundle], ctx): name="TestAll", ) - # Initialize progress bar. - for name in op.get_sub_progress_bar_names(): - pg = NoopSubProgressBar( - name=name, - max_name_length=100, - ) - op.set_sub_progress_bar(name, pg) + metrics = op.get_sub_progress_metrics() + assert list(metrics.keys()) == ["Test1", "Test2"] + assert all(isinstance(metric, ProgressMetrics) for metric in metrics.values()) # Feed data. op.start(ExecutionOptions()) From 44f0f73547b6fa3e5350ec57f6a95d2b49ae3bf0 Mon Sep 17 00:00:00 2001 From: 400Ping Date: Thu, 2 Apr 2026 01:38:01 +0800 Subject: [PATCH 2/6] update Signed-off-by: 400Ping --- .../execution/interfaces/task_context.py | 16 ---------------- .../operators/base_physical_operator.py | 3 +++ .../execution/operators/sub_progress.py | 13 +------------ .../data/_internal/progress/logging_progress.py | 2 +- .../ray/data/_internal/progress/rich_progress.py | 2 +- .../ray/data/_internal/progress/tqdm_progress.py | 2 +- 6 files changed, 7 insertions(+), 31 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces/task_context.py b/python/ray/data/_internal/execution/interfaces/task_context.py index c1383b291bc2..df8f7186ca7a 100644 --- a/python/ray/data/_internal/execution/interfaces/task_context.py +++ b/python/ray/data/_internal/execution/interfaces/task_context.py @@ -92,19 +92,3 @@ def current(cls, context: "TaskContext") -> Iterator["TaskContext"]: yield context finally: cls.reset_current() - - @property - def sub_progress_bar_dict(self): - return self.sub_progress_updaters - - @sub_progress_bar_dict.setter - def sub_progress_bar_dict(self, value): - self.sub_progress_updaters = value - - @property - def sub_progress_tracker_dict(self): - return self.sub_progress_updaters - - @sub_progress_tracker_dict.setter - def sub_progress_tracker_dict(self, value): - self.sub_progress_updaters = value diff --git a/python/ray/data/_internal/execution/operators/base_physical_operator.py b/python/ray/data/_internal/execution/operators/base_physical_operator.py index f31d78fa0daf..7874d5b044e6 100644 --- a/python/ray/data/_internal/execution/operators/base_physical_operator.py +++ b/python/ray/data/_internal/execution/operators/base_physical_operator.py @@ -133,6 +133,9 @@ def __init__( self._next_task_index = 0 self._num_outputs = num_outputs self._output_rows = 0 + # Keep the legacy attribute name during the transition. Some internal + # call sites still read this field directly instead of using the mixin. + self._sub_progress_bar_names = sub_progress_bar_names self._sub_progress_names = sub_progress_bar_names self._sub_progress_metrics = ( { diff --git a/python/ray/data/_internal/execution/operators/sub_progress.py b/python/ray/data/_internal/execution/operators/sub_progress.py index b1148c2525a3..a39c5c08a632 100644 --- a/python/ray/data/_internal/execution/operators/sub_progress.py +++ b/python/ray/data/_internal/execution/operators/sub_progress.py @@ -1,6 +1,6 @@ import typing from abc import ABC, abstractmethod -from typing import Dict, List, Optional +from typing import Dict, Optional if typing.TYPE_CHECKING: from ray.data._internal.progress.base_progress import ( @@ -22,14 +22,3 @@ def get_sub_progress_metrics(self) -> Optional[Dict[str, "ProgressMetrics"]]: def get_sub_progress_updaters(self) -> Optional[Dict[str, "SubProgressUpdater"]]: """Returns driver-side helpers for mutating sub-progress metrics.""" return None - - # Backward-compatible wrappers during the transition away from "bar" naming. - def get_sub_progress_bar_names(self) -> Optional[List[str]]: - metrics = self.get_sub_progress_metrics() - return list(metrics.keys()) if metrics is not None else None - - def get_sub_progress_names(self) -> Optional[List[str]]: - return self.get_sub_progress_bar_names() - - -SubProgressBarMixin = SubProgressMixin diff --git a/python/ray/data/_internal/progress/logging_progress.py b/python/ray/data/_internal/progress/logging_progress.py index 72937ef870fd..2d4e535e9035 100644 --- a/python/ray/data/_internal/progress/logging_progress.py +++ b/python/ray/data/_internal/progress/logging_progress.py @@ -145,7 +145,7 @@ def __init__( sub_progress_updaters = op.get_sub_progress_updaters() if sub_progress_metrics is None or sub_progress_updaters is None: continue - for name, metrics in sub_progress_metrics.items(): + for name in sub_progress_metrics: if sub_progress_bar_enabled: display_pg = LoggingSubProgressBar( name=name, total=total, max_name_length=self.MAX_NAME_LENGTH diff --git a/python/ray/data/_internal/progress/rich_progress.py b/python/ray/data/_internal/progress/rich_progress.py index e6f4eb61d8f1..58a29c9f6638 100644 --- a/python/ray/data/_internal/progress/rich_progress.py +++ b/python/ray/data/_internal/progress/rich_progress.py @@ -207,7 +207,7 @@ def _setup_operator_progress(self, topology: "Topology"): if sub_progress_metrics is None or sub_progress_updaters is None: continue - for name, metrics in sub_progress_metrics.items(): + for name in sub_progress_metrics: if sub_progress_bar_enabled: progress = self._make_progress_bar( _TREE_VERTICAL_SUB_PROGRESS, "", 10 diff --git a/python/ray/data/_internal/progress/tqdm_progress.py b/python/ray/data/_internal/progress/tqdm_progress.py index fff443998b3d..03b8f196c7f9 100644 --- a/python/ray/data/_internal/progress/tqdm_progress.py +++ b/python/ray/data/_internal/progress/tqdm_progress.py @@ -105,7 +105,7 @@ def __init__( sub_progress_updaters = op.get_sub_progress_updaters() if sub_progress_metrics is None or sub_progress_updaters is None: continue - for name, metrics in sub_progress_metrics.items(): + for name in sub_progress_metrics: if sub_progress_bar_enabled: display_pg = TqdmSubProgressBar( name=f" *- {name}", From ed8fd4ab0a939e87e8edb6b774fea97ba7180191 Mon Sep 17 00:00:00 2001 From: 400Ping Date: Sat, 2 May 2026 00:34:48 +0800 Subject: [PATCH 3/6] fix accord to review Signed-off-by: 400Ping --- .../data/_internal/execution/operators/base_physical_operator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/data/_internal/execution/operators/base_physical_operator.py b/python/ray/data/_internal/execution/operators/base_physical_operator.py index 7874d5b044e6..974713bc037a 100644 --- a/python/ray/data/_internal/execution/operators/base_physical_operator.py +++ b/python/ray/data/_internal/execution/operators/base_physical_operator.py @@ -136,7 +136,6 @@ def __init__( # Keep the legacy attribute name during the transition. Some internal # call sites still read this field directly instead of using the mixin. self._sub_progress_bar_names = sub_progress_bar_names - self._sub_progress_names = sub_progress_bar_names self._sub_progress_metrics = ( { name: ProgressMetrics(name=name, total=None, completed=0) From cd5756590dfc7db89659b0953a90ff73538e3a85 Mon Sep 17 00:00:00 2001 From: 400Ping Date: Thu, 7 May 2026 12:06:27 +0800 Subject: [PATCH 4/6] [Fix] Decouple sub-progress updaters from display bars Signed-off-by: 400Ping --- .../operators/base_physical_operator.py | 32 +++---- .../execution/operators/hash_shuffle.py | 30 ++----- .../execution/operators/sub_progress.py | 28 +++++- .../_internal/gpu_shuffle/hash_shuffle.py | 34 +++----- .../data/_internal/progress/base_progress.py | 65 +++++--------- .../_internal/progress/logging_progress.py | 87 +++++-------------- .../data/_internal/progress/rich_progress.py | 45 ++++++++-- .../data/_internal/progress/tqdm_progress.py | 38 ++++++-- python/ray/data/tests/test_progress_bar.py | 23 +++++ 9 files changed, 190 insertions(+), 192 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/base_physical_operator.py b/python/ray/data/_internal/execution/operators/base_physical_operator.py index 974713bc037a..3e69fc93d77c 100644 --- a/python/ray/data/_internal/execution/operators/base_physical_operator.py +++ b/python/ray/data/_internal/execution/operators/base_physical_operator.py @@ -1,4 +1,5 @@ import abc +import typing from typing import List, Optional from typing_extensions import override @@ -11,13 +12,12 @@ TaskContext, ) from ray.data._internal.execution.operators.sub_progress import SubProgressMixin -from ray.data._internal.progress.base_progress import ( - ProgressMetrics, - SubProgressUpdater, -) from ray.data._internal.stats import StatsDict from ray.data.context import DataContext +if typing.TYPE_CHECKING: + from ray.data._internal.progress.base_progress import ProgressMetrics + class InternalQueueOperatorMixin(PhysicalOperator, abc.ABC): @property @@ -136,25 +136,13 @@ def __init__( # Keep the legacy attribute name during the transition. Some internal # call sites still read this field directly instead of using the mixin. self._sub_progress_bar_names = sub_progress_bar_names - self._sub_progress_metrics = ( - { - name: ProgressMetrics(name=name, total=None, completed=0) - for name in sub_progress_bar_names - } - if sub_progress_bar_names is not None - else None - ) - self._sub_progress_updaters = ( - { - name: SubProgressUpdater( - self._sub_progress_metrics, - name=name, - max_name_length=100, - ) - for name in sub_progress_bar_names - } + ( + self._sub_progress_metrics, + self._sub_progress_updaters, + ) = ( + self._create_sub_progress_state(sub_progress_bar_names) if sub_progress_bar_names is not None - else None + else (None, None) ) self._input_buffer: FIFOBundleQueue = FIFOBundleQueue() self._output_buffer: FIFOBundleQueue = FIFOBundleQueue() diff --git a/python/ray/data/_internal/execution/operators/hash_shuffle.py b/python/ray/data/_internal/execution/operators/hash_shuffle.py index 0e6a956a7db7..4b6b95c6ff3a 100644 --- a/python/ray/data/_internal/execution/operators/hash_shuffle.py +++ b/python/ray/data/_internal/execution/operators/hash_shuffle.py @@ -56,10 +56,7 @@ from ray.data._internal.execution.operators.sub_progress import SubProgressMixin from ray.data._internal.logical.interfaces import LogicalOperator from ray.data._internal.output_buffer import BlockOutputBuffer, OutputBlockSizeOption -from ray.data._internal.progress.base_progress import ( - ProgressMetrics, - SubProgressUpdater, -) +from ray.data._internal.progress.base_progress import ProgressMetrics from ray.data._internal.stats import OpRuntimeMetrics from ray.data._internal.table_block import TableBlockAccessor from ray.data._internal.util import GiB, MiB @@ -647,27 +644,10 @@ def __init__( self._health_monitoring_start_time: float = 0.0 self._pending_aggregators_refs: Optional[List[ObjectRef[ActorHandle]]] = None - # Driver-side sub-progress metrics for manager-owned display state. - self._sub_progress_metrics = { - self.shuffle_name: ProgressMetrics( - name=self.shuffle_name, total=None, completed=0 - ), - self.reduce_name: ProgressMetrics( - name=self.reduce_name, total=None, completed=0 - ), - } - self._sub_progress_updaters = { - self.shuffle_name: SubProgressUpdater( - self._sub_progress_metrics, - name=self.shuffle_name, - max_name_length=100, - ), - self.reduce_name: SubProgressUpdater( - self._sub_progress_metrics, - name=self.reduce_name, - max_name_length=100, - ), - } + ( + self._sub_progress_metrics, + self._sub_progress_updaters, + ) = self._create_sub_progress_state([self.shuffle_name, self.reduce_name]) self._shuffle_metrics = OpRuntimeMetrics(self) self._reduce_metrics = OpRuntimeMetrics(self) diff --git a/python/ray/data/_internal/execution/operators/sub_progress.py b/python/ray/data/_internal/execution/operators/sub_progress.py index a39c5c08a632..afb63b64123d 100644 --- a/python/ray/data/_internal/execution/operators/sub_progress.py +++ b/python/ray/data/_internal/execution/operators/sub_progress.py @@ -1,6 +1,6 @@ import typing from abc import ABC, abstractmethod -from typing import Dict, Optional +from typing import Dict, List, Optional, Tuple if typing.TYPE_CHECKING: from ray.data._internal.progress.base_progress import ( @@ -19,6 +19,30 @@ def get_sub_progress_metrics(self) -> Optional[Dict[str, "ProgressMetrics"]]: """ ... + @abstractmethod def get_sub_progress_updaters(self) -> Optional[Dict[str, "SubProgressUpdater"]]: """Returns driver-side helpers for mutating sub-progress metrics.""" - return None + ... + + @classmethod + def _create_sub_progress_state( + cls, names: List[str] + ) -> Tuple[Dict[str, "ProgressMetrics"], Dict[str, "SubProgressUpdater"]]: + from ray.data._internal.progress.base_progress import ( + BaseExecutionProgressManager, + ProgressMetrics, + SubProgressUpdater, + ) + + metrics = { + name: ProgressMetrics(name=name, total=None, completed=0) for name in names + } + updaters = { + name: SubProgressUpdater( + metrics, + name=name, + max_name_length=BaseExecutionProgressManager.MAX_NAME_LENGTH, + ) + for name in names + } + return metrics, updaters diff --git a/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py b/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py index e19b521b7441..eb205d630876 100644 --- a/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py +++ b/python/ray/data/_internal/gpu_shuffle/hash_shuffle.py @@ -31,10 +31,7 @@ _get_total_cluster_resources, ) from ray.data._internal.execution.operators.sub_progress import SubProgressMixin -from ray.data._internal.progress.base_progress import ( - ProgressMetrics, - SubProgressUpdater, -) +from ray.data._internal.progress.base_progress import ProgressMetrics from ray.data._internal.stats import OpRuntimeMetrics from ray.data.block import Block, BlockAccessor, BlockStats, to_stats from ray.data.context import DataContext @@ -420,6 +417,9 @@ class GPUShuffleOperator(PhysicalOperator, SubProgressMixin): single call. """ + GPU_SHUFFLE_PROGRESS_NAME = "GPU Shuffle" + GPU_REDUCE_PROGRESS_NAME = "GPU Reduce" + def __init__( self, input_op: PhysicalOperator, @@ -468,22 +468,12 @@ def __init__( self._shuffled_blocks_stats: List[BlockStats] = [] self._output_blocks_stats: List[BlockStats] = [] - self._sub_progress_metrics = { - "GPU Shuffle": ProgressMetrics(name="GPU Shuffle", total=None, completed=0), - "GPU Reduce": ProgressMetrics(name="GPU Reduce", total=None, completed=0), - } - self._sub_progress_updaters = { - "GPU Shuffle": SubProgressUpdater( - self._sub_progress_metrics, - name="GPU Shuffle", - max_name_length=100, - ), - "GPU Reduce": SubProgressUpdater( - self._sub_progress_metrics, - name="GPU Reduce", - max_name_length=100, - ), - } + ( + self._sub_progress_metrics, + self._sub_progress_updaters, + ) = self._create_sub_progress_state( + [self.GPU_SHUFFLE_PROGRESS_NAME, self.GPU_REDUCE_PROGRESS_NAME] + ) # Metrics self._shuffle_metrics = OpRuntimeMetrics(self) @@ -523,7 +513,7 @@ def _on_insert_done(idx: int = task_idx) -> None: task_id=task.get_task_id(), ) - self._sub_progress_updaters["GPU Shuffle"].update( + self._sub_progress_updaters[self.GPU_SHUFFLE_PROGRESS_NAME].update( total=self._next_block_idx ) @@ -602,7 +592,7 @@ def _on_bundle_ready(bundle: RefBundle) -> None: self._estimated_output_num_rows = num_rows # Update Finalize progress bar - self._sub_progress_updaters["GPU Reduce"].update( + self._sub_progress_updaters[self.GPU_REDUCE_PROGRESS_NAME].update( increment=bundle.num_rows() or 0, total=self.num_output_rows_total() ) diff --git a/python/ray/data/_internal/progress/base_progress.py b/python/ray/data/_internal/progress/base_progress.py index f9824bf4d7ba..c61b9ff371ff 100644 --- a/python/ray/data/_internal/progress/base_progress.py +++ b/python/ray/data/_internal/progress/base_progress.py @@ -3,10 +3,9 @@ import typing from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Any, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional import ray -from ray.data._internal.execution.operators.sub_progress import SubProgressMixin from ray.data._internal.progress.utils import truncate_operator_name if typing.TYPE_CHECKING: @@ -195,6 +194,8 @@ def update_operator_progress( @dataclass(frozen=True) class ProgressMetrics: + """Immutable snapshot of sub-progress state exposed to progress managers.""" + name: str total: Optional[int] completed: int @@ -208,50 +209,40 @@ def __init__( metrics_by_name: Dict[str, ProgressMetrics], name: str, max_name_length: int, - display_bar: Optional[BaseProgressBar] = None, ): self._metrics_by_name = metrics_by_name self._name = name self._max_name_length = max_name_length - self._display_bar = display_bar self._desc = truncate_operator_name(name, self._max_name_length) + self._lock = threading.Lock() + self._update_callbacks: List[Callable[[ProgressMetrics], None]] = [] - def set_display_bar(self, display_bar: Optional[BaseProgressBar]) -> None: - self._display_bar = display_bar - metrics = self._metrics_by_name[self._name] - if self._display_bar is not None: - self._display_bar.set_description(self._desc) - self._display_bar.update(total=metrics.total) - if metrics.completed: - self._display_bar.update(metrics.completed) + def add_update_callback(self, callback: Callable[[ProgressMetrics], None]) -> None: + with self._lock: + self._update_callbacks.append(callback) + metrics = self._metrics_by_name[self._name] + callback(metrics) def set_description(self, name: str) -> None: self._desc = truncate_operator_name(name, self._max_name_length) - if self._display_bar is not None: - self._display_bar.set_description(self._desc) def get_description(self) -> str: return self._desc def update(self, increment: int = 0, total: Optional[int] = None) -> None: - metrics = self._metrics_by_name[self._name] - new_total = total if total is not None else metrics.total - new_completed = metrics.completed + increment - self._metrics_by_name[self._name] = ProgressMetrics( - name=metrics.name, - total=new_total, - completed=new_completed, - ) - if self._display_bar is not None: - self._display_bar.update(increment, total) - - def refresh(self): - if self._display_bar is not None: - self._display_bar.refresh() - - def close(self): - if self._display_bar is not None: - self._display_bar.close() + with self._lock: + metrics = self._metrics_by_name[self._name] + new_total = total if total is not None else metrics.total + new_completed = metrics.completed + increment + updated_metrics = ProgressMetrics( + name=metrics.name, + total=new_total, + completed=new_completed, + ) + self._metrics_by_name[self._name] = updated_metrics + callbacks = list(self._update_callbacks) + for callback in callbacks: + callback(updated_metrics) class NoopExecutionProgressManager(BaseExecutionProgressManager): @@ -264,15 +255,7 @@ def __init__( show_op_progress: bool, verbose_progress: bool, ): - for state in topology.values(): - op = state.op - if not isinstance(op, SubProgressMixin): - continue - updaters = op.get_sub_progress_updaters() - if updaters is None: - continue - for updater in updaters.values(): - updater.set_display_bar(None) + pass def start(self) -> None: pass diff --git a/python/ray/data/_internal/progress/logging_progress.py b/python/ray/data/_internal/progress/logging_progress.py index 2d4e535e9035..95316e8734fa 100644 --- a/python/ray/data/_internal/progress/logging_progress.py +++ b/python/ray/data/_internal/progress/logging_progress.py @@ -11,10 +11,7 @@ from ray.data._internal.execution.streaming_executor_state import ( format_op_state_summary, ) -from ray.data._internal.progress.base_progress import ( - BaseExecutionProgressManager, - BaseProgressBar, -) +from ray.data._internal.progress.base_progress import BaseExecutionProgressManager from ray.data._internal.progress.utils import truncate_operator_name if typing.TYPE_CHECKING: @@ -33,54 +30,6 @@ class _LoggingMetrics: total: Optional[int] -class LoggingSubProgressBar(BaseProgressBar): - """Thin wrapper to provide identical interface to the ProgressBar. - - Internally passes relevant logging metrics to `LoggingExecutionProgressManager`. - Sub-progress is actually handled by Ray through operators, while operator-level - and total progress is handled by the `StreamingExecutor`. To ensure log-order, - this class helps to pass metric data to the progress manager so progress metrics - are logged centrally. - """ - - def __init__( - self, - name: str, - total: Optional[int] = None, - max_name_length: int = 100, - ): - """Initialize sub-progress bar - - Args: - name: name of sub-progress bar - total: total number of output rows. None for unknown. - max_name_length: maximum operator name length (unused). - """ - del max_name_length # unused - self._total = total - self._completed = 0 - self._name = name - - def set_description(self, name: str) -> None: - pass # unused - - def get_description(self) -> str: - return "" # unused - - def update(self, increment: int = 0, total: Optional[int] = None): - if total is not None: - self._total = total - self._completed += increment - - def get_logging_metrics(self) -> _LoggingMetrics: - return _LoggingMetrics( - name=f" - {self._name}", - desc=None, - completed=self._completed, - total=self._total, - ) - - class LoggingExecutionProgressManager(BaseExecutionProgressManager): """Execution progress display for non-tty situations, preventing spamming of progress reporting.""" @@ -115,9 +64,7 @@ def __init__( name="Total Progress", desc=None, completed=0, total=None ) self._op_progress_metrics: Dict["OpState", _LoggingMetrics] = {} - self._sub_progress_metrics: Dict[ - "OpState", List[LoggingSubProgressBar] - ] = defaultdict(list) + self._sub_progress_names: Dict["OpState", List[str]] = defaultdict(list) for state in self._topology.values(): op = state.op @@ -142,19 +89,11 @@ def __init__( continue sub_progress_metrics = op.get_sub_progress_metrics() - sub_progress_updaters = op.get_sub_progress_updaters() - if sub_progress_metrics is None or sub_progress_updaters is None: + if sub_progress_metrics is None: continue for name in sub_progress_metrics: if sub_progress_bar_enabled: - display_pg = LoggingSubProgressBar( - name=name, total=total, max_name_length=self.MAX_NAME_LENGTH - ) - else: - display_pg = None - if display_pg is not None: - self._sub_progress_metrics[state].append(display_pg) - sub_progress_updaters[name].set_display_bar(display_pg) + self._sub_progress_names[state].append(name) # Management def start(self): @@ -184,8 +123,22 @@ def refresh(self): if metrics is None: continue _log_op_or_sub_progress(metrics) - for pg in self._sub_progress_metrics[opstate]: - _log_op_or_sub_progress(pg.get_logging_metrics()) + if isinstance(opstate.op, SubProgressMixin): + sub_progress_metrics = opstate.op.get_sub_progress_metrics() + if sub_progress_metrics is None: + continue + for name in self._sub_progress_names[opstate]: + metrics = sub_progress_metrics.get(name) + if metrics is None: + continue + _log_op_or_sub_progress( + _LoggingMetrics( + name=f" - {metrics.name}", + desc=None, + completed=metrics.completed, + total=metrics.total, + ) + ) # finish logging logger.info(lastline) diff --git a/python/ray/data/_internal/progress/rich_progress.py b/python/ray/data/_internal/progress/rich_progress.py index 58a29c9f6638..8cdd6cfdc6ec 100644 --- a/python/ray/data/_internal/progress/rich_progress.py +++ b/python/ray/data/_internal/progress/rich_progress.py @@ -106,6 +106,12 @@ def update(self, increment: int = 0, total: Optional[int] = None) -> None: self._completed += increment self._update(self._completed, self._total) + def update_absolute(self, completed: int, total: Optional[int] = None) -> None: + if self._enabled: + self._completed = completed + self._total = total + self._update(self._completed, self._total) + def complete(self) -> None: if self._enabled: self._update(self._completed, self._completed) @@ -130,6 +136,7 @@ def __init__( ): self._dataset_id = dataset_id self._sub_progress_bars: List[BaseProgressBar] = [] + self._sub_progress_display: List[Tuple["OpState", str, RichSubProgressBar]] = [] self._show_op_progress = show_op_progress self._verbose_progress = verbose_progress self._start_time: Optional[float] = None @@ -203,19 +210,18 @@ def _setup_operator_progress(self, topology: "Topology"): continue sub_progress_metrics = op.get_sub_progress_metrics() - sub_progress_updaters = op.get_sub_progress_updaters() - if sub_progress_metrics is None or sub_progress_updaters is None: + if sub_progress_metrics is None: continue + sub_progress_updaters = op.get_sub_progress_updaters() - for name in sub_progress_metrics: + for name, metrics in sub_progress_metrics.items(): if sub_progress_bar_enabled: progress = self._make_progress_bar( _TREE_VERTICAL_SUB_PROGRESS, "", 10 ) - total = state.op.num_output_rows_total() tid = progress.add_task( name, - total=total if total is not None else 1, + total=metrics.total if metrics.total is not None else 1, start=True, rate_str="? rows/s", count_str="0/?", @@ -223,16 +229,24 @@ def _setup_operator_progress(self, topology: "Topology"): rows.append(progress) display_pg = RichSubProgressBar( name=name, - total=total, + total=metrics.total, progress=progress, tid=tid, max_name_length=self.MAX_NAME_LENGTH, ) else: display_pg = None - sub_progress_updaters[name].set_display_bar(display_pg) if display_pg is not None: + display_pg.update_absolute(metrics.completed, metrics.total) + self._sub_progress_display.append((state, name, display_pg)) self._sub_progress_bars.append(display_pg) + if ( + sub_progress_updaters is not None + and name in sub_progress_updaters + ): + sub_progress_updaters[name].add_update_callback( + _make_sub_progress_sync_callback(display_pg) + ) if rows: self._layout_table.add_row(Text(f" {_TREE_VERTICAL}", no_wrap=True)) for row in rows: @@ -344,6 +358,23 @@ def update_operator_progress( stats_str = format_op_state_summary(op_state, resource_manager) stats.plain = f"{_TREE_VERTICAL_INDENT}{stats_str}" + if isinstance(op_state.op, SubProgressMixin): + metrics_by_name = op_state.op.get_sub_progress_metrics() + if metrics_by_name is None: + return + for state, name, display_pg in self._sub_progress_display: + if state is not op_state or name not in metrics_by_name: + continue + metrics = metrics_by_name[name] + display_pg.update_absolute(metrics.completed, metrics.total) + + +def _make_sub_progress_sync_callback(display_pg: RichSubProgressBar): + def sync_display(metrics): + display_pg.update_absolute(metrics.completed, metrics.total) + + return sync_display + # utilities def _format_k(val: int) -> str: diff --git a/python/ray/data/_internal/progress/tqdm_progress.py b/python/ray/data/_internal/progress/tqdm_progress.py index 03b8f196c7f9..2d7a8049d249 100644 --- a/python/ray/data/_internal/progress/tqdm_progress.py +++ b/python/ray/data/_internal/progress/tqdm_progress.py @@ -1,6 +1,6 @@ import logging import typing -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.operators.sub_progress import SubProgressMixin @@ -60,6 +60,7 @@ def __init__( self._dataset_id = dataset_id self._sub_progress_bars: List[BaseProgressBar] = [] + self._sub_progress_display: List[Tuple["OpState", str, TqdmSubProgressBar]] = [] self._op_display: Dict["OpState", TqdmSubProgressBar] = {} num_progress_bars = 0 @@ -102,14 +103,14 @@ def __init__( continue sub_progress_metrics = op.get_sub_progress_metrics() - sub_progress_updaters = op.get_sub_progress_updaters() - if sub_progress_metrics is None or sub_progress_updaters is None: + if sub_progress_metrics is None: continue - for name in sub_progress_metrics: + sub_progress_updaters = op.get_sub_progress_updaters() + for name, metrics in sub_progress_metrics.items(): if sub_progress_bar_enabled: display_pg = TqdmSubProgressBar( name=f" *- {name}", - total=total, + total=metrics.total, unit="row", position=num_progress_bars, max_name_length=self.MAX_NAME_LENGTH, @@ -118,9 +119,17 @@ def __init__( num_progress_bars += 1 else: display_pg = None - sub_progress_updaters[name].set_display_bar(display_pg) if display_pg is not None: + display_pg.update_absolute(metrics.completed, metrics.total) + self._sub_progress_display.append((state, name, display_pg)) self._sub_progress_bars.append(display_pg) + if ( + sub_progress_updaters is not None + and name in sub_progress_updaters + ): + sub_progress_updaters[name].add_update_callback( + _make_sub_progress_sync_callback(display_pg) + ) # Management def start(self): @@ -158,3 +167,20 @@ def update_operator_progress( ) summary_str = format_op_state_summary(opstate, resource_manager) pg.set_description(f"- {opstate.op.name}: {summary_str}") + + if isinstance(opstate.op, SubProgressMixin): + metrics_by_name = opstate.op.get_sub_progress_metrics() + if metrics_by_name is None: + return + for state, name, display_pg in self._sub_progress_display: + if state is not opstate or name not in metrics_by_name: + continue + metrics = metrics_by_name[name] + display_pg.update_absolute(metrics.completed, metrics.total) + + +def _make_sub_progress_sync_callback(display_pg: TqdmSubProgressBar): + def sync_display(metrics): + display_pg.update_absolute(metrics.completed, metrics.total) + + return sync_display diff --git a/python/ray/data/tests/test_progress_bar.py b/python/ray/data/tests/test_progress_bar.py index e10ffd176791..e1a837a72e28 100644 --- a/python/ray/data/tests/test_progress_bar.py +++ b/python/ray/data/tests/test_progress_bar.py @@ -6,6 +6,10 @@ from pytest import fixture import ray +from ray.data._internal.progress.base_progress import ( + ProgressMetrics, + SubProgressUpdater, +) from ray.data._internal.progress.progress_bar import ProgressBar @@ -83,6 +87,25 @@ def wrapped_close(): pb.close() +def test_sub_progress_updater_updates_metrics_and_notifies_callback(): + metrics_by_name = { + "Shuffle": ProgressMetrics(name="Shuffle", total=None, completed=0) + } + updater = SubProgressUpdater(metrics_by_name, "Shuffle", max_name_length=100) + snapshots = [] + + updater.add_update_callback(snapshots.append) + updater.update(increment=3, total=10) + + assert metrics_by_name["Shuffle"] == ProgressMetrics( + name="Shuffle", total=10, completed=3 + ) + assert snapshots == [ + ProgressMetrics(name="Shuffle", total=None, completed=0), + ProgressMetrics(name="Shuffle", total=10, completed=3), + ] + + @pytest.mark.parametrize( "name, expected_description, max_line_length, should_emit_warning", [ From a0a2bc077a46bf72805166eb271c01a7bef84476 Mon Sep 17 00:00:00 2001 From: 400Ping Date: Thu, 7 May 2026 12:46:11 +0800 Subject: [PATCH 5/6] [CI] Trigger PR ref update Signed-off-by: 400Ping From 827b23a9033db84d8179454ba42472f79fcbabe0 Mon Sep 17 00:00:00 2001 From: 400Ping Date: Thu, 7 May 2026 12:56:41 +0800 Subject: [PATCH 6/6] [Chore] Fix accord to review Signed-off-by: 400Ping --- .../data/_internal/progress/base_progress.py | 9 +++++++ .../data/_internal/progress/rich_progress.py | 22 ++--------------- .../data/_internal/progress/tqdm_progress.py | 24 +++---------------- 3 files changed, 14 insertions(+), 41 deletions(-) diff --git a/python/ray/data/_internal/progress/base_progress.py b/python/ray/data/_internal/progress/base_progress.py index c61b9ff371ff..26cb598e8c55 100644 --- a/python/ray/data/_internal/progress/base_progress.py +++ b/python/ray/data/_internal/progress/base_progress.py @@ -245,6 +245,15 @@ def update(self, increment: int = 0, total: Optional[int] = None) -> None: callback(updated_metrics) +def make_sub_progress_sync_callback( + display_bar: Any, +) -> Callable[[ProgressMetrics], None]: + def sync_display(metrics: ProgressMetrics) -> None: + display_bar.update_absolute(metrics.completed, metrics.total) + + return sync_display + + class NoopExecutionProgressManager(BaseExecutionProgressManager): """Noop Data Execution Progress Display Manager (Progress Display Disabled)""" diff --git a/python/ray/data/_internal/progress/rich_progress.py b/python/ray/data/_internal/progress/rich_progress.py index 8cdd6cfdc6ec..cb547ccfb270 100644 --- a/python/ray/data/_internal/progress/rich_progress.py +++ b/python/ray/data/_internal/progress/rich_progress.py @@ -27,6 +27,7 @@ from ray.data._internal.progress.base_progress import ( BaseExecutionProgressManager, BaseProgressBar, + make_sub_progress_sync_callback, ) from ray.data._internal.progress.utils import truncate_operator_name @@ -136,7 +137,6 @@ def __init__( ): self._dataset_id = dataset_id self._sub_progress_bars: List[BaseProgressBar] = [] - self._sub_progress_display: List[Tuple["OpState", str, RichSubProgressBar]] = [] self._show_op_progress = show_op_progress self._verbose_progress = verbose_progress self._start_time: Optional[float] = None @@ -238,14 +238,13 @@ def _setup_operator_progress(self, topology: "Topology"): display_pg = None if display_pg is not None: display_pg.update_absolute(metrics.completed, metrics.total) - self._sub_progress_display.append((state, name, display_pg)) self._sub_progress_bars.append(display_pg) if ( sub_progress_updaters is not None and name in sub_progress_updaters ): sub_progress_updaters[name].add_update_callback( - _make_sub_progress_sync_callback(display_pg) + make_sub_progress_sync_callback(display_pg) ) if rows: self._layout_table.add_row(Text(f" {_TREE_VERTICAL}", no_wrap=True)) @@ -358,23 +357,6 @@ def update_operator_progress( stats_str = format_op_state_summary(op_state, resource_manager) stats.plain = f"{_TREE_VERTICAL_INDENT}{stats_str}" - if isinstance(op_state.op, SubProgressMixin): - metrics_by_name = op_state.op.get_sub_progress_metrics() - if metrics_by_name is None: - return - for state, name, display_pg in self._sub_progress_display: - if state is not op_state or name not in metrics_by_name: - continue - metrics = metrics_by_name[name] - display_pg.update_absolute(metrics.completed, metrics.total) - - -def _make_sub_progress_sync_callback(display_pg: RichSubProgressBar): - def sync_display(metrics): - display_pg.update_absolute(metrics.completed, metrics.total) - - return sync_display - # utilities def _format_k(val: int) -> str: diff --git a/python/ray/data/_internal/progress/tqdm_progress.py b/python/ray/data/_internal/progress/tqdm_progress.py index 2d7a8049d249..fa1eab228271 100644 --- a/python/ray/data/_internal/progress/tqdm_progress.py +++ b/python/ray/data/_internal/progress/tqdm_progress.py @@ -1,6 +1,6 @@ import logging import typing -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.operators.sub_progress import SubProgressMixin @@ -10,6 +10,7 @@ from ray.data._internal.progress.base_progress import ( BaseExecutionProgressManager, BaseProgressBar, + make_sub_progress_sync_callback, ) from ray.data._internal.progress.progress_bar import ProgressBar @@ -60,7 +61,6 @@ def __init__( self._dataset_id = dataset_id self._sub_progress_bars: List[BaseProgressBar] = [] - self._sub_progress_display: List[Tuple["OpState", str, TqdmSubProgressBar]] = [] self._op_display: Dict["OpState", TqdmSubProgressBar] = {} num_progress_bars = 0 @@ -121,14 +121,13 @@ def __init__( display_pg = None if display_pg is not None: display_pg.update_absolute(metrics.completed, metrics.total) - self._sub_progress_display.append((state, name, display_pg)) self._sub_progress_bars.append(display_pg) if ( sub_progress_updaters is not None and name in sub_progress_updaters ): sub_progress_updaters[name].add_update_callback( - _make_sub_progress_sync_callback(display_pg) + make_sub_progress_sync_callback(display_pg) ) # Management @@ -167,20 +166,3 @@ def update_operator_progress( ) summary_str = format_op_state_summary(opstate, resource_manager) pg.set_description(f"- {opstate.op.name}: {summary_str}") - - if isinstance(opstate.op, SubProgressMixin): - metrics_by_name = opstate.op.get_sub_progress_metrics() - if metrics_by_name is None: - return - for state, name, display_pg in self._sub_progress_display: - if state is not opstate or name not in metrics_by_name: - continue - metrics = metrics_by_name[name] - display_pg.update_absolute(metrics.completed, metrics.total) - - -def _make_sub_progress_sync_callback(display_pg: TqdmSubProgressBar): - def sync_display(metrics): - display_pg.update_absolute(metrics.completed, metrics.total) - - return sync_display