Skip to content

feat(PartitionedOutput): Add outputChannels support#1972

Open
xin-zhang2 wants to merge 1 commit into
IBM:optimized_partitionedoutputfrom
xin-zhang2:PartitionedOutput-output
Open

feat(PartitionedOutput): Add outputChannels support#1972
xin-zhang2 wants to merge 1 commit into
IBM:optimized_partitionedoutputfrom
xin-zhang2:PartitionedOutput-output

Conversation

@xin-zhang2
Copy link
Copy Markdown
Member

@xin-zhang2 xin-zhang2 commented Apr 28, 2026

Add outputChannels support in OptimizedPartitionedOutput.

@xin-zhang2 xin-zhang2 requested a review from yingsu00 April 28, 2026 18:34
@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput-output branch 3 times, most recently from a884aa0 to 960998f Compare April 28, 2026 22:09
@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput-output branch 2 times, most recently from e1eb062 to e335c23 Compare May 7, 2026 15:20
});
}

RowVectorPtr OptimizedPartitionedOutput::prepareOutput(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is preparing input, not output. Rename to prepareInput

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to prepareSerializerInput

return input;
}

std::vector<VectorPtr> outputColumns;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outputColumns -> reorderedInputColumns

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to serializerInputColumns as it is passed to the serializer append() and it only contains the unique columns from output.

Comment thread velox/vector/PartitionedVector.cpp Outdated
PartitionBuildContext& ctx) {
auto* rowVector = vector_->as<RowVector>();
partitionedChildren_.reserve(rowVector->childrenSize());
std::unordered_map<const BaseVector*, PartitionedVectorPtr>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think the input-output mapping shall be done in PrestoIterativePartitioningSerializer::flushRowChildren(), not in PartitionedVector level. The PartitionedVector is NOT supposed to handle or know the remapping business which should happen in upper levels. Also, the change made here is hard to understand.

});
}

RowVectorPtr OptimizedPartitionedOutput::prepareOutput(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, let's not do the mapping at AddInput time, but at flush time. The place it should happen is PrestoIterativePartitioningSerializer::flushRowChildren().

@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput-output branch from e335c23 to f66413c Compare May 15, 2026 14:18
@xin-zhang2
Copy link
Copy Markdown
Member Author

@yingsu00 I've moved the mapping to flush time. Now the serailizer includes a member outputToInputChannels_ for this mapping, and the input passed to append() is prepared in OptimizedPartitionedOutput to include only the unique columns from the output.
Could you please take a look? Thanks.

@xin-zhang2 xin-zhang2 requested a review from yingsu00 May 15, 2026 14:32

namespace facebook::velox::exec {

void OptimizedPartitionedOutput::initializeSerializerLayout() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the initialization of Serializer done in the operator? Move the mapping to the Serializer constructor

});
}

RowVectorPtr OptimizedPartitionedOutput::prepareSerializerInput(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we still need this? The remapping shall be done at flush time but you are still mapping them at addInput time. You only need to set up the map once when the Serializer is created, then flushRowChildren shall flush the serialized columns out in the new order.

Suppose the outputType duplicates some input columns, constructing the input RowVector would make the Serializer do repeating job. Your previous change in PartitionedVector removed that repeating work, but it was at a wrong place and wrong level. The whole re-construct input vector thing is NOT supposed to happen at all.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could set up the mapping in the serializer constructor and pass the input vector directly to the serializer append(), as you suggested, and then apply the column mapping during the flush time.

My concern is that this would require all input columns to be partitioned before flush. While outputType may duplicate or reorder input columns, it may also prune some. Re-constructing the input vector to include only the unique columns referenced by the output columns could help us avoid partitioning unnecessary input columns. Does this seem reasonable to you?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants