FEAT-#6990: Implement lazy execution for the Ray virtual partitions.#6991
FEAT-#6990: Implement lazy execution for the Ray virtual partitions.#6991AndreyPavlenko wants to merge 1 commit into
Conversation
bf2943d to
ea540cc
Compare
2e3390b to
d98324d
Compare
128509d to
8ce0b34
Compare
8cc6583 to
b09a944
Compare
b09a944 to
92fe2f7
Compare
anmyachev
left a comment
There was a problem hiding this comment.
Judging by the annotations, we need to write a lot more tests to cover most of the changes.
| "RayWrapper", | ||
| "MaterializationHook", | ||
| "SignalActor", | ||
| "RayObjectRefTypes", |
There was a problem hiding this comment.
This item has been deleted
| """ | ||
| if not isinstance(obj_ids, Sequence): | ||
| obj_ids = list(obj_ids) | ||
| obj_ids = list(obj_ids) if isinstance(obj_ids, Iterable) else [obj_ids] |
|
|
||
| varname = "MODIN_LAZY_EXECUTION" | ||
| choices = ("Auto", "On", "Off") | ||
| choices = ("Auto", "On", "Off", "Axis") |
There was a problem hiding this comment.
Why introduce a new mode?
| try: | ||
| ref = ray.get(ref, timeout=0) | ||
| except ray.exceptions.GetTimeoutError: | ||
| return False |
There was a problem hiding this comment.
If an object has been calculated and placed in distributed storage, will materialization occur here?
If this approach can be effective, then it is worth considering the possibility of using it in other places.
|
|
||
|
|
||
| class SlicerHook(MaterializationHook): | ||
| class SlicerHook(MaterializationHook, DeferredExecution): |
There was a problem hiding this comment.
What is the idea behind this change?
| from .partition import PandasOnRayDataframePartition | ||
|
|
||
|
|
||
| class PandasOnRayDataframeVirtualPartition(BaseDataframeAxisPartition): |
There was a problem hiding this comment.
Why not such inheritance?
| class PandasOnRayDataframeVirtualPartition(BaseDataframeAxisPartition): | |
| class PandasOnRayDataframeVirtualPartition(PandasDataframeAxisPartition): |
| _execution_wrapper = RayWrapper | ||
| materialize_futures = RayWrapper.materialize | ||
|
|
||
| if LazyExecution.get() in ("On", "Axis"): |
There was a problem hiding this comment.
Whether to use this function or not is determined during the first import without the possibility of further replacement. As far as I remember, in all other places, functions are defined on each call.
|
|
||
| @classmethod | ||
| @_inherit_docstrings(GenericRayDataframePartitionManager.get_indices) | ||
| def get_indices(cls, axis, partitions, index_func=None): |
There was a problem hiding this comment.
Have you tried making lazy changes to the already existing get_indices? (without overriding)
There was a problem hiding this comment.
When we call this get_indices, do we trigger the entire lazy execution tree? If so, do we keep the result the consumers depend on?
There was a problem hiding this comment.
E.g., if we had a lazy apply and computed indices, would we keep the result of the apply?
There was a problem hiding this comment.
What this function is trying to do is to avoid the partitions concatenation. It could be possible in the case when all the partitions are the result of a deferred split operation. Look at the description of the find_non_split_block() function. There is an example of such an execution tree. If we can find in the tree the non-split partition, we can just get the index out of there and, thus, avoid the concatenation.
| PandasOnRayDataframeColumnPartition, | ||
| PandasOnRayDataframeRowPartition, | ||
| PandasOnRayDataframeVirtualPartition, |
There was a problem hiding this comment.
Have you tried making changes to existing classes?
| axis = 0 | ||
|
|
||
| @remote_function | ||
| def _remote_concat(dfs): # pragma: no cover # noqa: GL08 |
There was a problem hiding this comment.
Are you sure that the concat works as intended, given the message about the naming of the function arguments?
|
|
||
| import pandas | ||
| import ray | ||
| import ray.exceptions |
There was a problem hiding this comment.
It seems that a lot of the changes in this file are not directly affected by this pull request and therefore it would be great to move them into a separate pull request.
| # governing permissions and limitations under the License. | ||
|
|
||
| """Module houses class that implements ``GenericRayDataframePartitionManager`` using Ray.""" | ||
| import math |
There was a problem hiding this comment.
| import math | |
| import math |
| PandasOnRayDataframeRowPartition, | ||
| ) | ||
|
|
||
| if LazyExecution.get() in ("On", "Axis"): |
There was a problem hiding this comment.
This logic should probably be placed in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/init.py.
| # governing permissions and limitations under the License. | ||
|
|
||
| """Module houses classes responsible for storing a virtual partition and applying a function to it.""" | ||
| import math |
There was a problem hiding this comment.
| import math | |
| import math |
| """ | ||
|
|
||
| partition_type = PandasOnRayDataframePartition | ||
| instance_type = ray.ObjectRef |
There was a problem hiding this comment.
| instance_type = ray.ObjectRef |
@anmyachev, can this be removed?
| list of lengths or None | ||
| Estimated chunk lengths, that could be different form the real ones. | ||
| bool | ||
| Whether the specified partitions represent the full block or just the |
There was a problem hiding this comment.
Can you elaborate a little on this?
| manual_partition=False, | ||
| **kwargs, | ||
| ) -> Union[List[PandasOnRayDataframePartition], PandasOnRayDataframePartition]: | ||
| if not manual_partition: |
There was a problem hiding this comment.
Why does this parameter have effect only in case of False? Should we copy the related logic from the base class?
| lengths: Union[List[Union[ObjectRefType, int]], None], | ||
| ): | ||
| self.num_splits = num_splits | ||
| self.skip_chunks = set() |
There was a problem hiding this comment.
Let's put a comment what this is for.
| PandasOnRayDataframeColumnPartition | ||
| if self.axis | ||
| else PandasOnRayDataframeRowPartition |
There was a problem hiding this comment.
| PandasOnRayDataframeColumnPartition | |
| if self.axis | |
| else PandasOnRayDataframeRowPartition | |
| PandasOnRayDataframeRowPartition | |
| if self.axis | |
| else PandasOnRayDataframeColumnPartition |
Should this be so?
| if isinstance(obj, DeferredExecution): | ||
| if out_pos := getattr(obj, "out_pos", None): | ||
| if obj.has_result: | ||
| obj = obj.data |
There was a problem hiding this comment.
| obj = obj.data | |
| out_append(obj.data) |
I think it would be better to append obj.data in this if branch and remove the continue statements in all the else statements.
There was a problem hiding this comment.
If obj.data is a list, we need to deconstruct it either. Thus, we assign it to obj and go to the if isinstance(obj, ListOrTuple) check.
| if obj.subscribers == 0: | ||
| output[out_pos + 1] = 0 | ||
| result_consumers.remove(obj) | ||
| continue |
There was a problem hiding this comment.
| continue | |
| yield cls._deconstruct_chain(obj, output, stack, result_consumers) | ||
| out_append(_Tag.END) | ||
| elif isinstance(obj, ListOrTuple): | ||
| continue |
There was a problem hiding this comment.
| continue | |
| elif isinstance(obj, ListOrTuple): | ||
| continue | ||
|
|
||
| if isinstance(obj, ListOrTuple): |
There was a problem hiding this comment.
| if isinstance(obj, ListOrTuple): | |
| elif isinstance(obj, ListOrTuple): |
| out_append(_Tag.REF) | ||
| out_append(out_pos) | ||
| output[out_pos] = out_pos | ||
| if obj.subscribers == 0: | ||
| output[out_pos + 1] = 0 | ||
| result_consumers.remove(obj) |
There was a problem hiding this comment.
As this code is duplicated
modin/modin/core/execution/ray/common/deferred_execution.py
Lines 326 to 333 in 92fe2f7
There was a problem hiding this comment.
I don't think it makes sense to create a separate function just in order to reuse 3 lines of trivial code. Besides, it will cost a function call. Probably, a comment should be added here.
There was a problem hiding this comment.
Yeah a comment should be sufficent.
What do these changes do?
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.pyblack --check modin/ asv_bench/benchmarks scripts/doc_checker.pygit commit -sdocs/development/architecture.rstis up-to-date