feat(PartitionedOutput): Add outputChannels support#1972
Conversation
a884aa0 to
960998f
Compare
e1eb062 to
e335c23
Compare
| }); | ||
| } | ||
|
|
||
| RowVectorPtr OptimizedPartitionedOutput::prepareOutput( |
There was a problem hiding this comment.
This is preparing input, not output. Rename to prepareInput
There was a problem hiding this comment.
Renamed to prepareSerializerInput
| return input; | ||
| } | ||
|
|
||
| std::vector<VectorPtr> outputColumns; |
There was a problem hiding this comment.
outputColumns -> reorderedInputColumns
There was a problem hiding this comment.
Renamed to serializerInputColumns as it is passed to the serializer append() and it only contains the unique columns from output.
| PartitionBuildContext& ctx) { | ||
| auto* rowVector = vector_->as<RowVector>(); | ||
| partitionedChildren_.reserve(rowVector->childrenSize()); | ||
| std::unordered_map<const BaseVector*, PartitionedVectorPtr> |
There was a problem hiding this comment.
Actually, I think the input-output mapping shall be done in PrestoIterativePartitioningSerializer::flushRowChildren(), not in PartitionedVector level. The PartitionedVector is NOT supposed to handle or know the remapping business which should happen in upper levels. Also, the change made here is hard to understand.
| }); | ||
| } | ||
|
|
||
| RowVectorPtr OptimizedPartitionedOutput::prepareOutput( |
There was a problem hiding this comment.
Actually, let's not do the mapping at AddInput time, but at flush time. The place it should happen is PrestoIterativePartitioningSerializer::flushRowChildren().
e335c23 to
f66413c
Compare
|
@yingsu00 I've moved the mapping to flush time. Now the serailizer includes a member outputToInputChannels_ for this mapping, and the input passed to append() is prepared in OptimizedPartitionedOutput to include only the unique columns from the output. |
|
|
||
| namespace facebook::velox::exec { | ||
|
|
||
| void OptimizedPartitionedOutput::initializeSerializerLayout() { |
There was a problem hiding this comment.
Why is the initialization of Serializer done in the operator? Move the mapping to the Serializer constructor
| }); | ||
| } | ||
|
|
||
| RowVectorPtr OptimizedPartitionedOutput::prepareSerializerInput( |
There was a problem hiding this comment.
Why do we still need this? The remapping shall be done at flush time but you are still mapping them at addInput time. You only need to set up the map once when the Serializer is created, then flushRowChildren shall flush the serialized columns out in the new order.
Suppose the outputType duplicates some input columns, constructing the input RowVector would make the Serializer do repeating job. Your previous change in PartitionedVector removed that repeating work, but it was at a wrong place and wrong level. The whole re-construct input vector thing is NOT supposed to happen at all.
There was a problem hiding this comment.
We could set up the mapping in the serializer constructor and pass the input vector directly to the serializer append(), as you suggested, and then apply the column mapping during the flush time.
My concern is that this would require all input columns to be partitioned before flush. While outputType may duplicate or reorder input columns, it may also prune some. Re-constructing the input vector to include only the unique columns referenced by the output columns could help us avoid partitioning unnecessary input columns. Does this seem reasonable to you?
Add outputChannels support in OptimizedPartitionedOutput.