diff --git a/velox/exec/OptimizedPartitionedOutput.cpp b/velox/exec/OptimizedPartitionedOutput.cpp index 0ca9a957a8c..d9983a18cbd 100644 --- a/velox/exec/OptimizedPartitionedOutput.cpp +++ b/velox/exec/OptimizedPartitionedOutput.cpp @@ -16,6 +16,8 @@ #include "velox/exec/OptimizedPartitionedOutput.h" +#include + #include "velox/exec/HashPartitionFunction.h" #include "velox/exec/SerializedPage.h" #include "velox/exec/Task.h" @@ -67,12 +69,15 @@ OptimizedPartitionedOutput::OptimizedPartitionedOutput( operatorCtx_->driverCtx()->queryConfig().shuffleCompressionKind()); options.minCompressionRatio = 0.8; + initializeSerializerLayout(); + serializer_ = std::make_unique< serializer::presto::PrestoIterativePartitioningSerializer>( - inputType_, + outputType_, numDestinations_, options, pool_, + serializerInputByOutput_, [bufferManager = bufferManager_]() -> std::unique_ptr { auto lockedBufferManager = bufferManager.lock(); @@ -87,7 +92,10 @@ void OptimizedPartitionedOutput::addInput(RowVectorPtr input) { !replicateNullsAndAny_, "replicateNullsAndAny is not yet supported by OptimizedPartitionedOutput"); - if (serializer_->estimateBytesAfterAppend(input) > maxOutputBufferBytes_) { + auto serializerInput = prepareSerializerInput(input); + + if (serializer_->estimateBytesAfterAppend(serializerInput) > + maxOutputBufferBytes_) { flush(); } @@ -105,7 +113,7 @@ void OptimizedPartitionedOutput::addInput(RowVectorPtr input) { } } - serializer_->append(input, partitions_); + serializer_->append(serializerInput, partitions_); auto lockedStats = stats_.wlock(); ++numAppends_; @@ -157,6 +165,72 @@ bool OptimizedPartitionedOutput::isFinished() { return finished_; } +void OptimizedPartitionedOutput::initializeSerializerLayout() { + if (outputType_->size() == 0 || outputChannels_.empty()) { + serializerInputType_ = outputType_; + return; + } + + std::unordered_map outputToSerializerInput; + outputToSerializerInput.reserve(outputChannels_.size()); + + std::vector names; + std::vector types; + names.reserve(outputChannels_.size()); + types.reserve(outputChannels_.size()); + serializerInputByOutput_.reserve(outputChannels_.size()); + + for (const auto outputChannel : outputChannels_) { + auto it = outputToSerializerInput.find(outputChannel); + if (it == outputToSerializerInput.end()) { + const auto serializerInputChannel = + static_cast(serializerInputChannels_.size()); + serializerInputChannels_.push_back(outputChannel); + names.push_back(inputType_->nameOf(outputChannel)); + types.push_back(inputType_->childAt(outputChannel)); + it = + outputToSerializerInput.emplace(outputChannel, serializerInputChannel) + .first; + } + serializerInputByOutput_.push_back(it->second); + } + + serializerInputType_ = ROW(std::move(names), std::move(types)); +} + +RowVectorPtr OptimizedPartitionedOutput::prepareSerializerInput( + const RowVectorPtr& input) const { + VELOX_CHECK_NOT_NULL(input); + + if (serializerInputType_->size() == 0) { + return std::make_shared( + input->pool(), + serializerInputType_, + nullptr /*nulls*/, + input->size(), + std::vector{}); + } + + if (serializerInputChannels_.empty()) { + input->loadedVector(); + return input; + } + + std::vector serializerInputColumns; + serializerInputColumns.reserve(serializerInputChannels_.size()); + for (auto channel : serializerInputChannels_) { + auto loadedChild = BaseVector::loadedVectorShared(input->childAt(channel)); + serializerInputColumns.push_back(loadedChild); + } + + return std::make_shared( + input->pool(), + serializerInputType_, + nullptr /*nulls*/, + input->size(), + std::move(serializerInputColumns)); +} + void OptimizedPartitionedOutput::flush() { const auto flushedBytes = serializer_->bytesBuffered(); const auto flushedRows = serializer_->rowsBuffered(); diff --git a/velox/exec/OptimizedPartitionedOutput.h b/velox/exec/OptimizedPartitionedOutput.h index 0f9dd2e2b47..78ddcaf4a6f 100644 --- a/velox/exec/OptimizedPartitionedOutput.h +++ b/velox/exec/OptimizedPartitionedOutput.h @@ -55,6 +55,15 @@ class OptimizedPartitionedOutput : public Operator { bool isFinished() override; private: + /// Computes the serializer input columns and the mapping from output columns + /// to serializer input columns. + void initializeSerializerLayout(); + + /// Builds the RowVector consumed by the serializer. When the output layout + /// has duplicated columns, this projects only the distinct columns and + /// leaves duplication to flush time. + RowVectorPtr prepareSerializerInput(const RowVectorPtr& input) const; + /// Serializes all buffered rows into Presto pages and enqueues each page /// into the output buffer manager. All destinations are always enqueued; /// sets blockingReason_ and records a future if the output buffer is full. @@ -62,11 +71,9 @@ class OptimizedPartitionedOutput : public Operator { void flush(); const std::string taskId_; - /// Input row type; also used as output type (column reordering not yet - /// applied). const RowTypePtr inputType_; const std::vector keyChannels_; - /// Non-empty when the output column order differs from the input. + /// Non-empty when the output layout differs from the input const std::vector outputChannels_; const int32_t numDestinations_; @@ -78,12 +85,23 @@ class OptimizedPartitionedOutput : public Operator { const int64_t maxOutputBufferBytes_; velox::memory::MemoryPool* pool_; + /// Computes per-row partition assignments. Null when numDestinations_ == 1. std::unique_ptr partitionFunction_; /// Reusable buffer for per-row partition assignments. std::vector partitions_; + std::unique_ptr serializer_; + /// Row type passed to serializer_->append(). It only includes distinct + /// columns from the output layout. + RowTypePtr serializerInputType_; + /// Input channels that make up the serializer input type. Empty if the output + /// layout is the same as the input. + std::vector serializerInputChannels_; + /// For each output column index, store the corresponding serializer input + /// column. + std::vector serializerInputByOutput_; BlockingReason blockingReason_{BlockingReason::kNotBlocked}; ContinueFuture future_; diff --git a/velox/exec/tests/OptimizedPartitionedOutputTest.cpp b/velox/exec/tests/OptimizedPartitionedOutputTest.cpp index af9f272e062..ed9fa875624 100644 --- a/velox/exec/tests/OptimizedPartitionedOutputTest.cpp +++ b/velox/exec/tests/OptimizedPartitionedOutputTest.cpp @@ -244,6 +244,64 @@ class OptimizedPartitionedOutputTest : public OperatorTestBase { return result; } + RowTypePtr outputTypeForLayout( + const RowTypePtr& inputType, + const std::vector& outputLayout) { + if (outputLayout.empty()) { + return inputType; + } + + std::vector types; + types.reserve(outputLayout.size()); + for (const auto& name : outputLayout) { + types.push_back(inputType->findChild(name)); + } + return ROW(outputLayout, std::move(types)); + } + + RowVectorPtr buildOutput( + const RowVectorPtr& input, + const std::vector& outputLayout) { + const auto inputType = asRowType(input->type()); + const auto outputType = outputTypeForLayout(inputType, outputLayout); + + std::vector columns; + columns.reserve(outputLayout.size()); + for (const auto& name : outputLayout) { + columns.push_back(input->childAt(inputType->getChildIdx(name))); + } + return std::make_shared( + input->pool(), outputType, nullptr, input->size(), std::move(columns)); + } + + /// Sorts a vector by value for order-independent comparison. Returns a + /// dictionary vector with rows sorted in ascending order. + VectorPtr canonicalize(const VectorPtr& vector) { + const auto numRows = vector->size(); + auto indices = makeIndices(numRows, [](auto i) { return i; }); + auto* data = indices->asMutable(); + std::stable_sort(data, data + numRows, [&](auto a, auto b) { + return vector->compare(vector.get(), a, b) < 0; + }); + return BaseVector::wrapInDictionary(nullptr, indices, numRows, vector); + } + + /// Builds a RowVector by gathering rows from inputBatches at the given + /// (batchIdx, rowIdx) positions. Used to construct the per-partition expected + /// RowVector. + RowVectorPtr gatherRows( + const std::vector& batches, + const std::vector>& rowList, + const RowTypePtr& rowType) { + const auto numRows = static_cast(rowList.size()); + auto result = std::static_pointer_cast( + BaseVector::create(rowType, numRows, pool())); + for (vector_size_t r = 0; r < numRows; ++r) { + result->copy(batches[rowList[r].first].get(), r, rowList[r].second, 1); + } + return result; + } + int64_t getIntRuntimeStat(Task* task, const std::string& statName) { const auto taskStats = task->taskStats(); const auto& runtimeStats = @@ -264,14 +322,34 @@ class OptimizedPartitionedOutputTest : public OperatorTestBase { int numPartitions, std::unordered_map extraConfig = {}, std::chrono::seconds timeout = std::chrono::seconds{30}) { + return runPartitionedOutputWithLayout( + taskId, + inputBatches, + partitionKeys, + numPartitions, + {}, + std::move(extraConfig), + timeout); + } + + PartitionedOutputResult runPartitionedOutputWithLayout( + const std::string& taskId, + const std::vector& inputBatches, + const std::vector& partitionKeys, + int numPartitions, + const std::vector& outputLayout, + std::unordered_map extraConfig = {}, + std::chrono::seconds timeout = std::chrono::seconds{30}) { VELOX_CHECK(!inputBatches.empty()); const auto rowType = std::dynamic_pointer_cast(inputBatches[0]->type()); + const auto outputType = outputTypeForLayout(rowType, outputLayout); - auto plan = PlanBuilder() - .values(inputBatches) - .partitionedOutput(partitionKeys, numPartitions) - .planNode(); + auto plan = + PlanBuilder() + .values(inputBatches) + .partitionedOutput(partitionKeys, numPartitions, outputLayout) + .planNode(); auto task = Task::create( taskId, @@ -306,7 +384,7 @@ class OptimizedPartitionedOutputTest : public OperatorTestBase { if (result.pageCounts[p] > 0) { ++result.numNonEmptyPartitions; } - result.rowCounts[p] = concatPages(result.pages[p], rowType)->size(); + result.rowCounts[p] = concatPages(result.pages[p], outputType)->size(); } result.numAppends = getIntRuntimeStat(task.get(), "numAppends"); @@ -446,34 +524,6 @@ class OptimizedPartitionedOutputParamTest return makeRowVector(names, vecs); } - /// Sorts a vector by value for order-independent comparison. Returns a - /// dictionary vector with rows sorted in ascending order. - VectorPtr canonicalize(const VectorPtr& vector) { - const auto numRows = vector->size(); - auto indices = makeIndices(numRows, [](auto i) { return i; }); - auto* data = indices->asMutable(); - std::stable_sort(data, data + numRows, [&](auto a, auto b) { - return vector->compare(vector.get(), a, b) < 0; - }); - return BaseVector::wrapInDictionary(nullptr, indices, numRows, vector); - } - - /// Builds a RowVector by gathering rows from inputBatches at the given - /// (batchIdx, rowIdx) positions. Used to construct the per-partition expected - /// RowVector. - RowVectorPtr gatherRows( - const std::vector& batches, - const std::vector>& rowList, - const RowTypePtr& rowType) { - const auto numRows = static_cast(rowList.size()); - auto result = std::static_pointer_cast( - BaseVector::create(rowType, numRows, pool())); - for (vector_size_t r = 0; r < numRows; ++r) { - result->copy(batches[rowList[r].first].get(), r, rowList[r].second, 1); - } - return result; - } - /// Verifies that the deserialized pages for each partition exactly match the /// rows from inputBatches that were routed to that partition. Both expected /// and actual rows are sorted (canonicalized) before comparison to allow @@ -922,4 +972,65 @@ TEST_F(OptimizedPartitionedOutputTest, replicateNullsAndAnyUnsupported) { "replicateNullsAndAny is not yet supported by OptimizedPartitionedOutput")); } +TEST_F(OptimizedPartitionedOutputTest, outputLayout) { + auto input = makeRowVector( + {"p1", "v1", "v2", "unused"}, + {makeFlatVector({0, 1, 2, 3, 4, 5, 6, 7}), + makeFlatVector({10, 11, 12, 13, 14, 15, 16, 17}), + makeFlatVector({20, 21, 22, 23, 24, 25, 26, 27}), + makeFlatVector({30, 31, 32, 33, 34, 35, 36, 37})}); + auto inputCopy = + std::static_pointer_cast(BaseVector::copy(*input, pool())); + + const std::vector outputLayout = {"v2", "v1"}; + const auto inputType = asRowType(input->type()); + const auto outputType = outputTypeForLayout(inputType, outputLayout); + auto expected = buildOutput(inputCopy, outputLayout); + + auto result = runPartitionedOutputWithLayout( + "local://test-optimized-output-layout", {input}, {}, 1, outputLayout); + + auto actual = concatPages(result.pages[0], outputType); + velox::test::assertEqualVectors(expected, actual); +} + +TEST_F(OptimizedPartitionedOutputTest, duplicateOutputColumns) { + constexpr int kNumPartitions = 4; + auto input = makeRowVector( + {"p1", "v1"}, + {makeFlatVector({0, 1, 2, 3, 0, 1, 2, 3}), + makeFlatVector({10, 11, 12, 13, 14, 15, 16, 17})}); + auto inputCopy = + std::static_pointer_cast(BaseVector::copy(*input, pool())); + const std::vector outputLayout = {"v1", "v1"}; + const auto inputType = asRowType(input->type()); + const auto outputType = outputTypeForLayout(inputType, outputLayout); + auto output = buildOutput(inputCopy, outputLayout); + + auto result = runPartitionedOutputWithLayout( + "local://test-optimized-output-layout-duplicated-columns", + {input}, + {"p1"}, + kNumPartitions, + outputLayout); + + std::vector assignments(inputCopy->size()); + auto partitionFn = std::make_unique( + false, kNumPartitions, inputType, std::vector{0}); + partitionFn->partition(*inputCopy, assignments); + + std::vector>> expectedRows(kNumPartitions); + for (vector_size_t i = 0; i < assignments.size(); ++i) { + expectedRows[assignments[i]].emplace_back(0, i); + } + + for (int p = 0; p < kNumPartitions; ++p) { + auto expected = gatherRows({output}, expectedRows[p], outputType); + auto actual = concatPages(result.pages[p], outputType); + ASSERT_EQ(expected->size(), actual->size()) << "partition " << p; + velox::test::assertEqualVectors( + canonicalize(expected), canonicalize(actual)); + } +} + } // namespace facebook::velox::exec::test diff --git a/velox/serializers/PrestoIterativePartitioningSerializer.cpp b/velox/serializers/PrestoIterativePartitioningSerializer.cpp index c7ccdbf652a..533b8d6bb75 100644 --- a/velox/serializers/PrestoIterativePartitioningSerializer.cpp +++ b/velox/serializers/PrestoIterativePartitioningSerializer.cpp @@ -313,7 +313,8 @@ std::unique_ptr ColumnBufferState::create( type->kind()); default: VELOX_UNSUPPORTED( - "Unsupported type kind for createColumnBufferState: {}", type->kind()); + "Unsupported type kind for createColumnBufferState: {}", + type->kind()); } } @@ -337,15 +338,20 @@ class BufferState { const RowTypePtr& type, uint32_t numPartitions); - void append(const PartitionedVectorPtr& partitionedVector) { + void append( + const PartitionedVectorPtr& partitionedVector, + const std::vector& outputToInputChannels) { auto rowVector = std::dynamic_pointer_cast(partitionedVector); VELOX_CHECK_NOT_NULL(rowVector); rowsBuffered_ += partitionedVector->baseVector()->size(); - for (auto column = 0; column < children_.size(); ++column) { - children_[column]->append(rowVector->childAt(column)); + for (column_index_t column = 0; column < children_.size(); ++column) { + const auto inputColumn = outputToInputChannels.empty() + ? column + : outputToInputChannels[column]; + children_[column]->append(rowVector->childAt(inputColumn)); } for (auto p = 0; p < numPartitions_; ++p) { @@ -425,20 +431,26 @@ std::unique_ptr BufferState::create( } PrestoIterativePartitioningSerializer::PrestoIterativePartitioningSerializer( - RowTypePtr inputType, + RowTypePtr outputType, uint32_t numPartitions, const SerdeOpts& opts, memory::MemoryPool* pool, + std::vector outputToInputChannels, std::function()> listenerFactory) - : type_(std::move(inputType)), + : outputType_(std::move(outputType)), + outputToInputChannels_(std::move(outputToInputChannels)), numPartitions_(numPartitions), opts_(opts), pool_(pool), listenerFactory_(std::move(listenerFactory)), - numColumns_(type_->size()), - bufferState_(BufferState::create(type_, numPartitions_)) { + numColumns_(outputType_->size()), + bufferState_(BufferState::create(outputType_, numPartitions_)) { VELOX_CHECK_GT(numPartitions_, 0); VELOX_CHECK_NOT_NULL(pool_); + VELOX_CHECK( + outputToInputChannels_.empty() || + outputToInputChannels_.size() == outputType_->size(), + "outputToInputChannels size must match output column count"); } PrestoIterativePartitioningSerializer:: @@ -457,9 +469,41 @@ void PrestoIterativePartitioningSerializer::clear() { bufferState_->clear(); } +void PrestoIterativePartitioningSerializer::validateOutputInputMapping( + const RowVectorPtr& input) const { + const auto numInputColumns = input->childrenSize(); + for (column_index_t outputColumn = 0; outputColumn < numColumns_; + ++outputColumn) { + const auto inputColumn = outputToInputChannel(outputColumn); + VELOX_CHECK_LT( + inputColumn, + numInputColumns, + "Output column {} maps to invalid input column {}", + outputColumn, + inputColumn); + + const auto& child = input->childAt(inputColumn); + VELOX_CHECK_NOT_NULL( + child, + "Output column {} maps to null input column {}", + outputColumn, + inputColumn); + + const auto type = outputType_->childAt(outputColumn); + VELOX_CHECK( + child->type()->equivalent(*type), + "Output column {} expects {}, got {} from input column {}", + outputColumn, + type->toString(), + child->type()->toString(), + inputColumn); + } +} + int64_t PrestoIterativePartitioningSerializer::estimateBytesAfterAppend( const RowVectorPtr& input) const { VELOX_CHECK_NOT_NULL(input); + validateOutputInputMapping(input); if (input->size() == 0) { return bytesBuffered(); @@ -475,8 +519,17 @@ int64_t PrestoIterativePartitioningSerializer::estimateBytesAfterAppend( auto estimatedBytes = bufferState_->bytesBuffered() + numNewPartitions * (kHeaderSize + 4); - for (auto column = 0; column < numColumns_; ++column) { - const auto& columnType = type_->childAt(column); + // Cache per input column. If multiple output columns map to the same input + // column, reuse the already computed incremental bytes. + std::vector> estimatedIncrementalBytes( + input->childrenSize()); + for (column_index_t column = 0; column < numColumns_; ++column) { + const auto inputColumn = outputToInputChannel(column); + if (estimatedIncrementalBytes[inputColumn].has_value()) { + estimatedBytes += *estimatedIncrementalBytes[inputColumn]; + continue; + } + const auto& columnType = outputType_->childAt(column); if (columnType->isUnknown()) { VELOX_UNSUPPORTED( "Unsupported type kind for " @@ -484,7 +537,7 @@ int64_t PrestoIterativePartitioningSerializer::estimateBytesAfterAppend( columnType->kind()); } else if (columnType->isFixedWidth()) { const auto* columnState = bufferState_->children()[column].get(); - const auto inputNulls = countNulls(*input->childAt(column)); + const auto inputNulls = countNulls(*input->childAt(inputColumn)); const auto partitionsWithNulls = std::min( bufferState_->numNonEmptyPartitions() + numNewPartitions, columnState->numPartitionsWithNulls() + inputNulls.value_or(numRows)); @@ -493,12 +546,13 @@ int64_t PrestoIterativePartitioningSerializer::estimateBytesAfterAppend( auto nullBitmapBytesBuffered = columnState->nullBitmapBytesBuffered(); VELOX_DCHECK_GE(nullBitmapBytes, nullBitmapBytesBuffered); - estimatedBytes += numNewPartitions * + estimatedIncrementalBytes[inputColumn] = numNewPartitions * simpleColumnBytes(columnType, 0, 0) + // header growth nullBitmapBytes - nullBitmapBytesBuffered + // null bitmap growth static_cast(numRows - inputNulls.value_or(0)) * fixedTypeWidth(columnType->kind()); // value bytes growth + estimatedBytes += *estimatedIncrementalBytes[inputColumn]; } else { switch (columnType->kind()) { case TypeKind::VARCHAR: @@ -530,6 +584,8 @@ void PrestoIterativePartitioningSerializer::append( partitions.size(), "partitions.size() must equal input->size()"); + validateOutputInputMapping(input); + if (input->size() == 0) { return; } @@ -542,7 +598,7 @@ void PrestoIterativePartitioningSerializer::append( ctx, pool_); - bufferState_->append(partitionedRowVector); + bufferState_->append(partitionedRowVector, outputToInputChannels_); partitionedRowVectors_.push_back(std::move(partitionedRowVector)); } @@ -575,7 +631,7 @@ PrestoIterativePartitioningSerializer::flushUncompressed() { nonEmptyPartitions.push_back(p); } } - const auto& rowSchema = type_->asRow(); + const auto& rowSchema = outputType_->asRow(); // 2. Create per-partition listeners first so the codec mask can be derived // from whether the factory actually produced a listener. The factory may @@ -599,7 +655,6 @@ PrestoIterativePartitioningSerializer::flushUncompressed() { std::vector beginStreamPositions(numPartitions_); for (uint32_t p : nonEmptyPartitions) { - listeners[p] = std::make_unique(); outputStreams[p] = std::make_unique( *pool_, listeners[p].get(), bufferState_->bytesPerPartition()[p]); rawOutputStreams[p] = outputStreams[p].get(); @@ -677,7 +732,8 @@ void PrestoIterativePartitioningSerializer::flushRowChildren( const auto& partitionedRowVector = std::dynamic_pointer_cast(partitionedVector); VELOX_DCHECK_NOT_NULL(partitionedRowVector.get()); - column.push_back(partitionedRowVector->childAt(col)); + column.push_back( + partitionedRowVector->childAt(outputToInputChannel(col))); } flushColumn( diff --git a/velox/serializers/PrestoIterativePartitioningSerializer.h b/velox/serializers/PrestoIterativePartitioningSerializer.h index 88abeb49e5a..8ab7d31dc7e 100644 --- a/velox/serializers/PrestoIterativePartitioningSerializer.h +++ b/velox/serializers/PrestoIterativePartitioningSerializer.h @@ -41,6 +41,19 @@ class BufferState; /// internal state so the serializer can be reused for the next cycle. class PrestoIterativePartitioningSerializer { public: + PrestoIterativePartitioningSerializer( + RowTypePtr outputType, + uint32_t numPartitions, + const SerdeOpts& opts, + memory::MemoryPool* pool) + : PrestoIterativePartitioningSerializer( + std::move(outputType), + numPartitions, + opts, + pool, + {}, + nullptr) {} + /// Constructs the serializer. If `listenerFactory` is non-null it is called /// once per non-empty partition on each flush to create an /// OutputStreamListener that accumulates the CRC32 checksum; the checksum @@ -49,10 +62,29 @@ class PrestoIterativePartitioningSerializer { /// which matches the behavior of kNormal PartitionedOutput when /// OutputBufferManager has no listener factory set. PrestoIterativePartitioningSerializer( - RowTypePtr inputType, + RowTypePtr outputType, uint32_t numPartitions, const SerdeOpts& opts, memory::MemoryPool* pool, + std::function()> listenerFactory) + : PrestoIterativePartitioningSerializer( + std::move(outputType), + numPartitions, + opts, + pool, + {}, + std::move(listenerFactory)) {} + + /// Constructs the serializer with an explicit output-column to input-column + /// mapping. `outputToInputChannels[i]` indicates which child of the RowVector + /// passed to append() should be serialized for output column i. When empty, + /// output column i uses input child i. + PrestoIterativePartitioningSerializer( + RowTypePtr outputType, + uint32_t numPartitions, + const SerdeOpts& opts, + memory::MemoryPool* pool, + std::vector outputToInputChannels, std::function()> listenerFactory = nullptr); @@ -84,6 +116,14 @@ class PrestoIterativePartitioningSerializer { vector_size_t rowsBuffered() const; private: + void validateOutputInputMapping(const RowVectorPtr&) const; + + column_index_t outputToInputChannel(column_index_t outputColumn) const { + return outputToInputChannels_.empty() + ? outputColumn + : outputToInputChannels_[outputColumn]; + } + std::map, vector_size_t>> flushUncompressed(); std::map, vector_size_t>> @@ -170,13 +210,15 @@ class PrestoIterativePartitioningSerializer { const std::vector& nonEmptyPartitions, const std::vector& outputStreams) const; - RowTypePtr type_; + RowTypePtr outputType_; + std::vector outputToInputChannels_; uint32_t numPartitions_; SerdeOpts opts_; memory::MemoryPool* pool_; + std::function()> listenerFactory_; - /// Number of top-level columns in `type_`. + /// Number of top-level columns in `outputType_`. uint32_t numColumns_{0}; std::vector partitionedRowVectors_; diff --git a/velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp b/velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp index 87a81ff996a..4116632f762 100644 --- a/velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp +++ b/velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp @@ -21,6 +21,7 @@ #include #include "velox/common/base/BitUtil.h" +#include "velox/common/base/tests/GTestUtils.h" #include "velox/serializers/PrestoIterativePartitioningSerializer.h" #include "velox/serializers/PrestoSerializerSerializationUtils.h" @@ -592,6 +593,69 @@ TEST_F(PrestoIterativePartitioningSerializerTest, bytesBufferedNullFlagGrowth) { EXPECT_EQ(bytesBuffered, totalFlushedBytes(ioBufs)); } +// A flush time output mapping serializes one input colum into multiple output +// columns. +TEST_F( + PrestoIterativePartitioningSerializerTest, + duplicateOutputColumnAtFlush) { + auto outputType = ROW({"v1", "v2"}, {BIGINT(), BIGINT()}); + SerdeOpts opts; + auto serializer = std::make_unique( + outputType, 2, opts, pool_.get(), std::vector{0, 0}); + + serializer->append( + makeRowVector({"v"}, {makeFlatVector({10, 11, 12, 13})}), + {0, 1, 0, 1}); + + auto ioBufs = serializer->flush(); + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, outputType); + auto r1 = deserialize(*ioBufs.at(1).first, outputType); + + ASSERT_EQ(r0->size(), 2); + ASSERT_EQ(r1->size(), 2); + + EXPECT_EQ(sortedValues(r0, 0), (std::vector{10, 12})); + EXPECT_EQ(sortedValues(r0, 1), (std::vector{10, 12})); + EXPECT_EQ(sortedValues(r1, 0), (std::vector{11, 13})); + EXPECT_EQ(sortedValues(r1, 1), (std::vector{11, 13})); +} + +TEST_F( + PrestoIterativePartitioningSerializerTest, + outputInputMappingOutOfRange) { + auto outputType = ROW({"v1", "v2"}, {BIGINT(), BIGINT()}); + SerdeOpts opts; + auto serializer = std::make_unique( + outputType, 2, opts, pool_.get(), std::vector{0, 1}); + + VELOX_ASSERT_THROW( + serializer->append( + makeRowVector({"v"}, {makeFlatVector({10, 11})}), {0, 1}), + "Output column 1 maps to invalid input column 1"); +} + +TEST_F( + PrestoIterativePartitioningSerializerTest, + outputInputMappingTypeMismatch) { + auto outputType = ROW({"v1", "v2"}, {BIGINT(), BIGINT()}); + SerdeOpts opts; + auto serializer = std::make_unique( + outputType, 2, opts, pool_.get(), std::vector{0, 1}); + + VELOX_ASSERT_THROW( + serializer->append( + makeRowVector( + {"v1", "v2"}, + { + makeFlatVector({10, 11}), + makeFlatVector({12, 13}), + }), + {0, 1}), + "Output column 1 expects BIGINT, got INTEGER from input column 1"); +} + TEST_F( PrestoIterativePartitioningSerializerTest, estimateBytesAfterAppendExactForSinglePartition) {