From 13173eac747172aa7116b5a6b9d74419ebf76082 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Mon, 12 Jan 2026 22:11:47 -0800 Subject: [PATCH 01/10] feat: Introducing PartitionedVector This commit introduces `PartitionedVector` - a low-level execution abstraction that provides an in-place, partition-aware layout of a vector based on per-row partition IDs. 1. **In-place rearrangement**: Rearrange vector data in memory without creating multiple copies 2. **Buffer reuse**: Allow reuse of temporary buffers across multiple partitioning operations 3. **Minimal abstraction**: Similar to `DecodedVector`, focus on efficient execution rather than operator semantics 4. **Thread-unsafe by design**: Optimized for single-threaded execution contexts For more information please see https://github.com/IBM/velox/issues/1703 Alchemy-item: (ID = 1150) Introducing PartitionedVector commit 1/1 - 960f41b03895ba2fc3ea3853daa035c411af549c --- velox/vector/CMakeLists.txt | 1 + velox/vector/PartitionedVector.cpp | 343 ++++++++++++++++++ velox/vector/PartitionedVector.h | 244 +++++++++++++ velox/vector/tests/CMakeLists.txt | 1 + velox/vector/tests/PartitionedVectorTest.cpp | 168 +++++++++ velox/vector/tests/utils/CMakeLists.txt | 2 +- .../tests/utils/PartitionedVectorTestBase.cpp | 126 +++++++ .../tests/utils/PartitionedVectorTestBase.h | 42 +++ 8 files changed, 926 insertions(+), 1 deletion(-) create mode 100644 velox/vector/PartitionedVector.cpp create mode 100644 velox/vector/PartitionedVector.h create mode 100644 velox/vector/tests/PartitionedVectorTest.cpp create mode 100644 velox/vector/tests/utils/PartitionedVectorTestBase.cpp create mode 100644 velox/vector/tests/utils/PartitionedVectorTestBase.h diff --git a/velox/vector/CMakeLists.txt b/velox/vector/CMakeLists.txt index 10769cc214d..cc8ddfa8b8f 100644 --- a/velox/vector/CMakeLists.txt +++ b/velox/vector/CMakeLists.txt @@ -21,6 +21,7 @@ velox_add_library( FlatMapVector.cpp FlatVector.cpp LazyVector.cpp + PartitionedVector.cpp SelectivityVector.cpp SequenceVector.cpp SimpleVector.cpp diff --git a/velox/vector/PartitionedVector.cpp b/velox/vector/PartitionedVector.cpp new file mode 100644 index 00000000000..43fb7fb5d53 --- /dev/null +++ b/velox/vector/PartitionedVector.cpp @@ -0,0 +1,343 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/vector/PartitionedVector.h" + +#include "velox/vector/FlatVector.h" + +namespace facebook::velox { + +using Byte = uint8_t; +using BitIndex = uint8_t; + +namespace { + +inline void countPartitionSizes( + const std::vector& partitions, + vector_size_t* rowCounts) { + VELOX_DCHECK_NOT_NULL(rowCounts); + + for (vector_size_t i = 0; i < partitions.size(); i++) { + rowCounts[partitions[i]]++; + } +} + +inline void prefixSum(vector_size_t* offsets, uint32_t numPartitions) { + for (uint32_t i = 1; i < numPartitions; i++) { + offsets[i] += offsets[i - 1]; + } +} + +inline void calculateOffsets( + const std::vector& partitions, + uint32_t numPartitions, + vector_size_t* endPartitionOffsets) { + VELOX_DCHECK_NOT_NULL(endPartitionOffsets); + + if (numPartitions > 1) { + std::fill_n(endPartitionOffsets, numPartitions, 0); + countPartitionSizes(partitions, endPartitionOffsets); + prefixSum(endPartitionOffsets, numPartitions); + } else { + endPartitionOffsets[0] = static_cast(partitions.size()); + } +} + +// endPartitionOffsets is an array of length numPartitions where each entry i is +// the exclusive end position of partition i. cursorPartitionOffsets is +// initialized such that cursorPartitionOffsets[0] = 0 and for i>0, +// cursorPartitionOffsets[i] = endPartitionOffsets[i-1], i.e., the inclusive +// begin positions. +void initializeCursorPartitionOffsets( + BufferPtr& cursorPartitionOffsets, + const BufferPtr& endPartitionOffsets, + uint32_t numPartitions, + velox::memory::MemoryPool* pool) { + VELOX_DCHECK_NOT_NULL(endPartitionOffsets); + VELOX_DCHECK_EQ( + endPartitionOffsets->size(), numPartitions * sizeof(vector_size_t)); + + ensureCapacity(cursorPartitionOffsets, numPartitions, pool); + cursorPartitionOffsets->asMutable()[0] = 0; + std::memcpy( + &cursorPartitionOffsets->asMutable()[1], + endPartitionOffsets->as(), + sizeof(vector_size_t) * (numPartitions - 1)); + cursorPartitionOffsets->setSize(numPartitions * sizeof(vector_size_t)); +} + +// In-place partitioning algorithm for fixed-width values +// This algorithm rearranges elements so that each element ends up in its target +// partition by repeatedly swapping elements until the current element belongs +// to the current partition +template +void partitionFixedWidthValuesInPlace( + T* values, + const std::vector& partitions, + uint32_t numPartitions, + vector_size_t* cursorPartitionOffsets, + const vector_size_t* endPartitionOffsets) { + VELOX_DCHECK_NOT_NULL(values); + VELOX_DCHECK_NOT_NULL(cursorPartitionOffsets); + VELOX_DCHECK_NOT_NULL(endPartitionOffsets); + + for (auto currentPartition = 0; currentPartition < numPartitions; + currentPartition++) { + vector_size_t& offset = cursorPartitionOffsets[currentPartition]; + vector_size_t endOffset = endPartitionOffsets[currentPartition]; + + while (offset < endOffset) { + uint32_t targetPartition = partitions[offset]; + + while (targetPartition != currentPartition) { + auto destinationOffset = cursorPartitionOffsets[targetPartition]++; + std::swap(values[destinationOffset], values[offset]); + targetPartition = partitions[destinationOffset]; + } + offset = ++cursorPartitionOffsets[currentPartition]; + } + } +} + +template +void partitionFixedWidthValues( + BufferPtr& inputBuffer, + const std::vector& partitions, + const BufferPtr& endPartitionOffsets, + uint32_t numPartitions, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { + VELOX_DCHECK_NOT_NULL(inputBuffer); + VELOX_DCHECK_NOT_NULL(endPartitionOffsets); + + auto input = inputBuffer->asMutable(); + + initializeCursorPartitionOffsets( + ctx.cursorPartitionOffsets, endPartitionOffsets, numPartitions, pool); + + vector_size_t* rawCursorOffsets = + ctx.cursorPartitionOffsets->asMutable(); + const vector_size_t* rawEndOffsets = + endPartitionOffsets->asMutable(); + + partitionFixedWidthValuesInPlace( + input, partitions, numPartitions, rawCursorOffsets, rawEndOffsets); +} + +// Swap two bits between two bytes +void swapBit(Byte& byte1, BitIndex bit1, Byte& byte2, BitIndex bit2) { + // Calculate the difference between the bits + char bitDiff = ((byte1 >> bit1) & 1) ^ ((byte2 >> bit2) & 1); + + // Apply the difference to toggle the bits + byte1 ^= (bitDiff << bit1); + byte2 ^= (bitDiff << bit2); +} + +void partitionBitsInPlace( + Byte* bits, + const std::vector& partitions, + uint32_t numPartitions, + PartitionBuildContext& ctx, + const BufferPtr& endPartitionOffsets, + velox::memory::MemoryPool* pool) { + initializeCursorPartitionOffsets( + ctx.cursorPartitionOffsets, endPartitionOffsets, numPartitions, pool); + + auto rawCursorOffsets = + ctx.cursorPartitionOffsets->asMutable(); + auto rawEndOffsets = endPartitionOffsets->asMutable(); + + for (uint32_t partition = 0; partition < numPartitions; partition++) { + auto& offset = rawCursorOffsets[partition]; + auto endOffset = rawEndOffsets[partition]; + while (offset < endOffset) { + uint32_t p = partitions[offset]; + while (p != partition) { + vector_size_t destinationOffset = rawCursorOffsets[p]++; + + // Calculate the byte address and bit index within the byte for the + // source and destination bits. Since each byte contains 8 bits, we + // divide the offset by 8 to get the byte address and take the modulus + // by 8 to get the bit index within that byte. + vector_size_t destinationAddr = destinationOffset >> 3; + int8_t destinationBitInByte = destinationOffset & 7; + vector_size_t fromAddr = offset >> 3; + int8_t fromBitInByte = offset & 7; + + swapBit( + bits[destinationAddr], + destinationBitInByte, + bits[fromAddr], + fromBitInByte); + p = partitions[destinationOffset]; + } + offset = ++rawCursorOffsets[partition]; + } + } +} + +template +PartitionedVectorPtr createPartitionedFlatVector( + VectorPtr vector, + const std::vector& partitions, + uint32_t numPartitions, + const BufferPtr& endPartitionOffsets, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { + using T = typename TypeTraits::NativeType; + auto flatVector = std::dynamic_pointer_cast>(vector); + VELOX_CHECK_NOT_NULL(flatVector); + + auto partitionedFlatVector = std::make_shared>( + flatVector, numPartitions, endPartitionOffsets, pool); + + if (numPartitions > 1) { + partitionedFlatVector->partition(partitions, ctx); + } + + return partitionedFlatVector; +} + +} // namespace + +PartitionedVector::~PartitionedVector() = default; + +PartitionedVectorPtr PartitionedVector::create( + const VectorPtr& vector, + const std::vector& partitions, + uint32_t numPartitions, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { + VELOX_CHECK_NOT_NULL(vector); + VELOX_CHECK_EQ(vector->size(), partitions.size()); + VELOX_CHECK_GT(numPartitions, 0); + VELOX_CHECK_NOT_NULL(pool); + + // Calculate the end offsets for each partition. For example, if there are 3 + // partitions with 2, 3, and 1 rows respectively, then endPartitionOffsets[0] + // = 2, endPartitionOffsets[1] = 5, and endPartitionOffsets[2] = 6. + BufferPtr endPartitionOffsets; + ensureCapacity(endPartitionOffsets, numPartitions, pool); + calculateOffsets( + partitions, + numPartitions, + endPartitionOffsets->asMutable()); + endPartitionOffsets->setSize(numPartitions * sizeof(vector_size_t)); + + auto raw = endPartitionOffsets->as(); + VELOX_DCHECK_EQ(raw[numPartitions - 1], partitions.size()); + + return create( + vector, partitions, numPartitions, endPartitionOffsets, ctx, pool); +} + +PartitionedVectorPtr PartitionedVector::create( + const VectorPtr& vector, + const std::vector& partitions, + uint32_t numPartitions, + const BufferPtr& endPartitionOffsets, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { + VELOX_CHECK_NOT_NULL(endPartitionOffsets); + VELOX_CHECK_EQ( + endPartitionOffsets->size(), numPartitions * sizeof(vector_size_t)); + + auto encoding = vector->encoding(); + auto typeKind = vector->typeKind(); + + switch (encoding) { + case VectorEncoding::Simple::FLAT: { + auto partitionedFlatVector = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + createPartitionedFlatVector, + typeKind, + vector, + partitions, + numPartitions, + endPartitionOffsets, + ctx, + pool); + return partitionedFlatVector; + } + + case VectorEncoding::Simple::ROW: + case VectorEncoding::Simple::ARRAY: + case VectorEncoding::Simple::MAP: + case VectorEncoding::Simple::DICTIONARY: + case VectorEncoding::Simple::BIASED: + case VectorEncoding::Simple::SEQUENCE: + case VectorEncoding::Simple::CONSTANT: + case VectorEncoding::Simple::LAZY: + VELOX_UNSUPPORTED( + "Unsupported vector encoding for PartitionedVector: {}", + mapSimpleToName(encoding)); + default: + VELOX_UNREACHABLE( + "Invalid vector encoding for PartitionedVector: {}", encoding); + } +} + +VectorPtr PartitionedVector::baseVector() const { + return vector_; +} + +std::string PartitionedVector::toString() const { + std::string offsets; + for (vector_size_t i = 0; i < numPartitions_; ++i) { + if (i > 0) { + offsets += ','; + } + offsets += fmt::format("{}", rawEndPartitionOffsets_[i]); + } + + return fmt::format( + "PartitionedVector[numPartitions: {}, offsets: {}]", + numPartitions_, + offsets); +} + +template +void PartitionedFlatVector::partition( + const std::vector& partitions, + PartitionBuildContext& ctx) { + Byte* rawNulls = reinterpret_cast(vector_->mutableRawNulls()); + if (rawNulls) { + partitionBitsInPlace( + rawNulls, partitions, numPartitions_, ctx, endPartitionOffsets_, pool_); + } + + auto valuesBuffer = vector_->as>()->values(); + partitionFixedWidthValues( + valuesBuffer, + partitions, + endPartitionOffsets_, + numPartitions_, + ctx, + pool_); +} + +template +VectorPtr PartitionedFlatVector::partitionAt(uint32_t partition) const { + VELOX_CHECK_LT(partition, numPartitions_); + + vector_size_t beginOffset = + partition == 0 ? 0 : rawEndPartitionOffsets_[partition - 1]; + vector_size_t numRowsInPartition = + rawEndPartitionOffsets_[partition] - beginOffset; + + return vector_->slice(beginOffset, numRowsInPartition); +} + +} // namespace facebook::velox diff --git a/velox/vector/PartitionedVector.h b/velox/vector/PartitionedVector.h new file mode 100644 index 00000000000..8c0983813e9 --- /dev/null +++ b/velox/vector/PartitionedVector.h @@ -0,0 +1,244 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "velox/vector/BaseVector.h" +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox { + +class PartitionedVector; +using PartitionedVectorPtr = std::shared_ptr; + +namespace { + +// TODO: This was copied from dwio::common::BufferUtil.h. However the vector +// module should not depend on dwio. Move this to a common place +template +inline void ensureCapacity( + BufferPtr& data, + size_t numElements, + velox::memory::MemoryPool* pool, + bool preserveOldData = false, + bool clearBits = false) { + size_t oldSize = 0; + size_t newCapacity = BaseVector::byteSize(numElements); + if (!data) { + data = AlignedBuffer::allocate(numElements, pool); + } else { + oldSize = data->size(); + if (!data->isMutable() || data->capacity() < newCapacity) { + auto newData = AlignedBuffer::allocate(numElements, pool); + if (preserveOldData) { + std::memcpy( + newData->template asMutable(), + data->as(), + oldSize); + } + data = newData; + } + } + + if (clearBits && newCapacity > oldSize) { + std::memset( + (void*)(data->asMutable() + oldSize), + 0L, + newCapacity - oldSize); + } +} + +} // namespace + +/// Construction-time context used to build a PartitionedVector. +/// +/// This struct contains only transient execution context needed during +/// construction. None of the fields here define the logical state of +/// PartitionedVector and none are retained after create(). +/// All fields are only valid during the PartitionedVector::create() call. +struct PartitionBuildContext { + BufferPtr cursorPartitionOffsets = nullptr; + + PartitionBuildContext() = default; +}; + +/// PartitionedVector provides an in-place, partition-aware layout of a vector +/// based on per-row partition IDs. +/// +/// This is a low-level execution abstraction, analogous to DecodedVector: +/// - it owns partitioning metadata (offsets, indices) +/// - it does not encode operator-specific semantics +/// - it is intended to be reused by multiple exec components +/// (aggregation, sorting, shuffle, etc.) +/// +/// The partitioning operation rearranges rows so that rows belonging to the +/// same partition occupy a contiguous range. +/// +/// Thread-safety: +/// This class is NOT thread-safe. All methods must be called from a single +/// thread. Internal buffers are mutated during create(). +class PartitionedVector { + public: + /// Disable default constructor. + PartitionedVector() = delete; + + /// Disable copy constructor and assignment. + PartitionedVector(const PartitionedVector& other) = delete; + PartitionedVector& operator=(const PartitionedVector& other) = delete; + + // Use default move constructor and move assignment operator. + PartitionedVector(PartitionedVector&&) noexcept = default; + PartitionedVector& operator=(PartitionedVector&&) noexcept = default; + + /// Virtual destructor. + virtual ~PartitionedVector(); + + /// Factory method to create a PartitionedVector. This is the main entry point + /// for constructing a PartitionedVector. The partitioning operation + /// rearranges rows in the base vector so that rows belonging to the same + /// partition occupy a contiguous range. + /// + /// Params: + /// - vector: the base vector to be partitioned. This is modified during + /// partitioning, and becomes the underlying vector of the created + /// PartitionedVector. + /// - partitions: a vector of partition IDs for each row in the base vector. + /// The length of this vector must be the same as the number of rows in the + /// base vector. Each entry must be a value between 0 and numPartitions - 1. + /// - numPartitions: the total number of partitions. This must be greater than + /// 0. + /// - ctx: the context object for building the partitioned vector. This + /// contains transient execution context needed during construction, such as + /// intermediate buffers. None of the fields in this context define the + /// logical state of the PartitionedVector, and none are retained after + /// create(). All fields in this context are only valid during the create() + /// call. + /// - pool: the memory pool for allocating any necessary buffers during the + /// creation of the PartitionedVector. + static PartitionedVectorPtr create( + const VectorPtr& vector, + const std::vector& partitions, + uint32_t numPartitions, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool); + + /// Returns the underlying vector. + VectorPtr baseVector() const; + + /// Returns the partitioned vector at partition p. If the number of rows in + /// that partition is 0, returns an empty vector. + virtual VectorPtr partitionAt(uint32_t partition) const = 0; + + template + T* as() { + static_assert(std::is_base_of_v); + return dynamic_cast(this); + } + + TypeKind typeKind() const { + return vector_->typeKind(); + } + + vector_size_t* rawPartitionOffsets() { + return rawEndPartitionOffsets_; + } + + virtual const vector_size_t* rawSizes() = 0; + + /// Returns string representation of the value in the specified row. + virtual std::string toString() const; + + protected: + // Internal create method that accepts pre-computed endPartitionOffsets + // buffer. + static PartitionedVectorPtr create( + const VectorPtr& vector, + const std::vector& partitions, + uint32_t numPartitions, + const BufferPtr& partitionOffsetsBuffer, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool); + + PartitionedVector( + const VectorPtr& vector, + uint32_t numPartitions, + const BufferPtr& endPartitionOffsets, + velox::memory::MemoryPool* pool) + : vector_(vector), + numPartitions_(numPartitions), + endPartitionOffsets_(endPartitionOffsets), + pool_(pool) { + VELOX_CHECK_NOT_NULL(vector_); + VELOX_CHECK_GT(numPartitions_, 0); + VELOX_CHECK_NOT_NULL(endPartitionOffsets_); + VELOX_CHECK_EQ( + endPartitionOffsets_->size(), numPartitions_ * sizeof(vector_size_t)); + VELOX_CHECK_NOT_NULL(pool_); + + rawEndPartitionOffsets_ = endPartitionOffsets_->asMutable(); + } + + virtual void partition( + const std::vector& partitions, + PartitionBuildContext& ctx) = 0; + + // The base vector that is being partitioned. This is modified during + // partitioning. + VectorPtr vector_; + + // Total number of partitions. This is set at construction and does not change + // during partitioning. It doesn't have const quantifier because we want to + // allow move assignment operator. + uint32_t numPartitions_; + + // The cumulative end row offsets for each partition. For example, if there + // are 3 partitions with 2, 3, and 1 rows respectively, then + // endPartitionOffsets_[0] = 2, endPartitionOffsets_[1] = 5, and + // endPartitionOffsets_[2] = 6. + BufferPtr endPartitionOffsets_; + + // The raw pointer to the endPartitionOffsets_ buffer for easy access during + // partitioning. + vector_size_t* rawEndPartitionOffsets_; + + velox::memory::MemoryPool* pool_; +}; + +using PartitionedVectorPtr = std::shared_ptr; + +template +class PartitionedFlatVector : public PartitionedVector { + public: + PartitionedFlatVector( + const VectorPtr& flatVector, + uint32_t numPartitions, + const BufferPtr& partitionOffsets, + velox::memory::MemoryPool* pool) + : PartitionedVector(flatVector, numPartitions, partitionOffsets, pool) {} + + void partition( + const std::vector& partitions, + PartitionBuildContext& ctx) override; + + VectorPtr partitionAt(uint32_t partition) const override; + + const vector_size_t* rawSizes() override { + VELOX_UNREACHABLE("PartitionedFlatVector does not implement rawSizes()"); + } +}; + +} // namespace facebook::velox diff --git a/velox/vector/tests/CMakeLists.txt b/velox/vector/tests/CMakeLists.txt index 5eb6cf593b6..074530b31fc 100644 --- a/velox/vector/tests/CMakeLists.txt +++ b/velox/vector/tests/CMakeLists.txt @@ -24,6 +24,7 @@ add_executable( IsWritableVectorTest.cpp LazyVectorTest.cpp MayHaveNullsRecursiveTest.cpp + PartitionedVectorTest.cpp SelectivityVectorTest.cpp StringVectorBufferTest.cpp VariantToVectorTest.cpp diff --git a/velox/vector/tests/PartitionedVectorTest.cpp b/velox/vector/tests/PartitionedVectorTest.cpp new file mode 100644 index 00000000000..df5b586ec6a --- /dev/null +++ b/velox/vector/tests/PartitionedVectorTest.cpp @@ -0,0 +1,168 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include + +#include + +#include "vector/tests/utils/VectorTestBase.h" +#include "velox/vector/PartitionedVector.h" +#include "velox/vector/tests/utils/PartitionedVectorTestBase.h" + +namespace facebook::velox::test { + +class PartitioningVectorTest : public testing::TestWithParam, + public test::PartitionedVectorTestBase { + protected: + std::mt19937 gen_ = std::mt19937(std::random_device{}()); + + PartitionBuildContext ctx_; + BufferPtr partitionOffsets_; + + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + void testPartitionedVector( + VectorPtr vector, + const std::vector& partitions, + uint32_t numPartitions) { + // Back up the vector before calling PartitionedVector::create() + VectorPtr vectorCopy = BaseVector::copy(*vector); + // Build the expected vector using the reference implementation + std::vector expectedVectors = + partitionVectorByWrapping(vectorCopy, partitions, numPartitions); + + // Initialize buffers needed for PartitionedVector::create() + ensureCapacity( + ctx_.cursorPartitionOffsets, numPartitions, pool_.get()); + + // Calculate the number of values for each partition + std::vector partitionRowCounts(numPartitions, 0); + for (auto partition : partitions) { + partitionRowCounts[partition]++; + } + + // Create the partitioned vector using the actual implementation + auto partitionedVector = PartitionedVector::create( + vector, + partitions, + numPartitions, + // partitionOffsets_, + ctx_, + pool_.get()); + VELOX_CHECK_NOT_NULL(partitionedVector); + + // Extract each partition and compare with expected results + std::vector partitionedVectors; + for (uint32_t i = 0; i < numPartitions; ++i) { + auto partition = partitionedVector->partitionAt(i); + partitionedVectors.push_back(partition); + } + + for (uint32_t i = 0; i < numPartitions; ++i) { + test::assertEqualVectors( + expectedVectors[i], canonicalize(partitionedVectors[i])); + } + } + + void testVectorPartitioning(VectorPtr vector) { + auto numRows = vector->size(); + std::vector partitions(numRows); + + // Test with single partition + std::fill(partitions.begin(), partitions.end(), 0); + auto vectorCopy = BaseVector::copy(*vector, pool_.get()); + testPartitionedVector(vectorCopy, partitions, 1); + + // Test with two partitions + if (vector->size() >= 3) { + for (uint32_t i = 0; i < partitions.size(); ++i) { + partitions[i] = i % 2; + } + vectorCopy = BaseVector::copy(*vector, pool_.get()); + testPartitionedVector(vectorCopy, partitions, 2); + } + + // Test with three partitions + for (uint32_t i = 0; i < partitions.size(); ++i) { + partitions[i] = i % 3; + } + vectorCopy = BaseVector::copy(*vector, pool_.get()); + testPartitionedVector(vectorCopy, partitions, 3); + + if (vector->size() > 4) { + // Test with four partitions where the first partition is empty + for (uint32_t i = 0; i < partitions.size(); ++i) { + partitions[i] = i % 3 + 1; + } + vectorCopy = BaseVector::copy(*vector, pool_.get()); + testPartitionedVector(vectorCopy, partitions, 4); + + // Test with four partitions where the last partition is empty + for (uint32_t i = 0; i < partitions.size(); ++i) { + partitions[i] = i % 3; + } + vectorCopy = BaseVector::copy(*vector, pool_.get()); + testPartitionedVector(vectorCopy, partitions, 4); + } + + // Test with one value per partition + if (vector->size() > 0) { + std::iota(partitions.begin(), partitions.end(), 0); + vectorCopy = BaseVector::copy(*vector, pool_.get()); + testPartitionedVector(vectorCopy, partitions, numRows); + } + + // Test with random partitions (number of partitions <= number of values) + std::uniform_int_distribution<> dis(0, numRows - 1); + uint32_t maxPartition = 0; + for (uint32_t i = 0; i < numRows; ++i) { + partitions[i] = dis(gen_); + maxPartition = std::max(maxPartition, partitions[i]); + } + vectorCopy = BaseVector::copy(*vector, pool_.get()); + testPartitionedVector(vectorCopy, partitions, maxPartition + 1); + } +}; + +TEST_P(PartitioningVectorTest, testFlatVector) { + // Number of values in the vector to be partitioned. This is passed as a test + // parameter and is used to test different vector sizes, including edge cases + // like 0 and 1. + const int numValues = GetParam(); + + // Random values, no nulls + testVectorPartitioning( + makeFlatVector(numValues, [](auto row) { return row; })); + + // Random values, with half number of nulls + testVectorPartitioning( + makeFlatVector( + numValues, [](auto row) { return row; }, nullEvery(2, 1))); + + // All nulls + testVectorPartitioning(makeAllNullFlatVector(numValues)); +} + +// Test with different vector sizes, including edge cases like 0 and 1. +INSTANTIATE_TEST_SUITE_P( + FlatVectorSizes, + PartitioningVectorTest, + ::testing::Values(0, 1, 10, 10000)); + +} // namespace facebook::velox::test diff --git a/velox/vector/tests/utils/CMakeLists.txt b/velox/vector/tests/utils/CMakeLists.txt index 4d61ce356e6..a282faa960a 100644 --- a/velox/vector/tests/utils/CMakeLists.txt +++ b/velox/vector/tests/utils/CMakeLists.txt @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -add_library(velox_vector_test_lib VectorMaker.cpp VectorTestBase.cpp) +add_library(velox_vector_test_lib PartitionedVectorTestBase.cpp VectorMaker.cpp VectorTestBase.cpp) target_link_libraries( velox_vector_test_lib diff --git a/velox/vector/tests/utils/PartitionedVectorTestBase.cpp b/velox/vector/tests/utils/PartitionedVectorTestBase.cpp new file mode 100644 index 00000000000..6c939dfb569 --- /dev/null +++ b/velox/vector/tests/utils/PartitionedVectorTestBase.cpp @@ -0,0 +1,126 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/vector/tests/utils/PartitionedVectorTestBase.h" + +namespace facebook::velox::test { + +VectorPtr PartitionedVectorTestBase::canonicalize(VectorPtr vector) { + auto numRows = vector->size(); + + auto indices = makeIndices(numRows, [&](auto row) { return row; }); + vector_size_t* indicesRange = indices->asMutable(); + + // Sort the indices based on the vector values + std::stable_sort( + indicesRange, + indicesRange + numRows, + [&](vector_size_t left, vector_size_t right) { + return vector->compare(vector.get(), left, right) < 0; + }); + + auto sortedVector = wrapInDictionary(indices, numRows, vector); + return sortedVector; +} + +std::vector PartitionedVectorTestBase::partitionVectorByWrapping( + VectorPtr vector, + const std::vector& partitions, + uint32_t numPartitions) { + auto numRows = vector->size(); + + // Count the number of rows in each partition + std::vector partitionRowCounts(numPartitions, 0); + for (int i = 0; i < numRows; i++) { + partitionRowCounts[partitions[i]]++; + } + + std::vector partitionedVectors(numPartitions, nullptr); + + for (int p = 0; p < numPartitions; p++) { + auto numRowsInPartition = partitionRowCounts[p]; + + if (numRowsInPartition == 0) { + partitionedVectors[p] = + BaseVector::create(vector->type(), 0, pool_.get()); + continue; + } + + // Create an indices buffer for each partition, and fill it with the row + // indices for that partition. + std::vector rowIdsInPartition(numRowsInPartition); + vector_size_t offset = 0; + for (vector_size_t i = 0; i < numRows; ++i) { + if (partitions[i] == p) { + VELOX_DCHECK_LT(offset, numRowsInPartition); + rowIdsInPartition[offset++] = i; + } + } + VELOX_CHECK_EQ(offset, numRowsInPartition); + auto indices = makeIndices(partitionRowCounts[p], [&](auto row) { + return rowIdsInPartition[row]; + }); + + // Simulate partitioning by building the DictionaryVector with the + // partitioned indices + // Copy firsts because wrapInDictionary would take the ownership of the + // vector + VectorPtr vectorCopy = BaseVector::copy(*vector, pool_.get()); + auto dictionaryVector = BaseVector::wrapInDictionary( + nullptr, indices, numRowsInPartition, vectorCopy); + partitionedVectors[p] = canonicalize(dictionaryVector); + } + return partitionedVectors; +} + +std::vector PartitionedVectorTestBase::partitionRowVectors( + const std::vector& rowVectors, + int32_t numPartitions, + core::PartitionFunction* partitionFunction) { + // RowVectorPtr mergedRowVector = mergeRowVectors(rowVectors); + VectorPtr mergedRowVector = + mergeVectors((const std::vector&)rowVectors); + auto totalNumRows = mergedRowVector->size(); + + std::vector partitions(totalNumRows, 0); + if (numPartitions > 1) { + auto rowType = asRowType(mergedRowVector->type()); + // auto partitionFunction = createPartitionFunction(rowType, {0}); + partitionFunction->partition(*mergedRowVector->as(), partitions); + } + + std::vector partitionedVectors = + partitionVectorByWrapping(mergedRowVector, partitions, numPartitions); + + for (auto& vector : partitionedVectors) { + vector = canonicalize(vector); + } + return partitionedVectors; +} + +VectorPtr PartitionedVectorTestBase::mergeVectors( + const std::vector& vectors) { + // We have to count the total number of rows first in order to allocate the + // mergedRowVector. + auto mergedVector = BaseVector::copy(*vectors[0]); + for (auto i = 1; i < vectors.size(); ++i) { + mergedVector->append(vectors[i].get()); + } + + return mergedVector; +} + +} // namespace facebook::velox::test diff --git a/velox/vector/tests/utils/PartitionedVectorTestBase.h b/velox/vector/tests/utils/PartitionedVectorTestBase.h new file mode 100644 index 00000000000..b2c50761edc --- /dev/null +++ b/velox/vector/tests/utils/PartitionedVectorTestBase.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/core/PlanNode.h" +#include "velox/vector/PartitionedVector.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +namespace facebook::velox::test { + +class PartitionedVectorTestBase : public VectorTestBase { + protected: + std::vector partitionVectorByWrapping( + VectorPtr vector, + const std::vector& partitions, + uint32_t numPartitions); + + std::vector partitionRowVectors( + const std::vector& rowVectors, + int32_t numPartitions, + core::PartitionFunction* partitionFunction); + + VectorPtr canonicalize(VectorPtr vector); + + VectorPtr mergeVectors(const std::vector& vectors); +}; + +} // namespace facebook::velox::test From ae7bc864ab3395e96a81d2845599d1784725d61b Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Thu, 5 Mar 2026 07:15:03 -0800 Subject: [PATCH 02/10] feat: Add PartitionedRowVector implementation Signed-off-by: Xin Zhang Alchemy-item: (ID = 1167) Add PartitionedRowVector commit 1/1 - f2af427191ae48de9e2b65b4d6ef6e3525673435 --- velox/vector/PartitionedVector.cpp | 79 +++++++++++++++++++- velox/vector/PartitionedVector.h | 27 +++++++ velox/vector/tests/PartitionedVectorTest.cpp | 42 +++++++++++ 3 files changed, 147 insertions(+), 1 deletion(-) diff --git a/velox/vector/PartitionedVector.cpp b/velox/vector/PartitionedVector.cpp index 43fb7fb5d53..0999e59b351 100644 --- a/velox/vector/PartitionedVector.cpp +++ b/velox/vector/PartitionedVector.cpp @@ -211,6 +211,26 @@ PartitionedVectorPtr createPartitionedFlatVector( return partitionedFlatVector; } +PartitionedVectorPtr createPartitionedRowVector( + VectorPtr vector, + const std::vector& partitions, + uint32_t numPartitions, + const BufferPtr& endPartitionOffsets, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { + auto rowVector = std::dynamic_pointer_cast(vector); + VELOX_CHECK_NOT_NULL(rowVector); + + auto partitionedRowVector = std::make_shared( + rowVector, numPartitions, endPartitionOffsets, pool); + + // Always call partition() to initialize partitionedChildren_, even when + // numPartitions == 1, so that partitionAt() can reconstruct the RowVector. + partitionedRowVector->partition(partitions, ctx); + + return partitionedRowVector; +} + } // namespace PartitionedVector::~PartitionedVector() = default; @@ -272,7 +292,11 @@ PartitionedVectorPtr PartitionedVector::create( return partitionedFlatVector; } - case VectorEncoding::Simple::ROW: + case VectorEncoding::Simple::ROW: { + return createPartitionedRowVector( + vector, partitions, numPartitions, endPartitionOffsets, ctx, pool); + } + case VectorEncoding::Simple::ARRAY: case VectorEncoding::Simple::MAP: case VectorEncoding::Simple::DICTIONARY: @@ -340,4 +364,57 @@ VectorPtr PartitionedFlatVector::partitionAt(uint32_t partition) const { return vector_->slice(beginOffset, numRowsInPartition); } +void PartitionedRowVector::partition( + const std::vector& partitions, + PartitionBuildContext& ctx) { + auto* rowVector = vector_->as(); + partitionedChildren_.reserve(rowVector->childrenSize()); + + for (const auto& child : rowVector->children()) { + partitionedChildren_.push_back(PartitionedVector::create( + child, partitions, numPartitions_, endPartitionOffsets_, ctx, pool_)); + } + + if (numPartitions_ > 1) { + Byte* rawNulls = reinterpret_cast(vector_->mutableRawNulls()); + if (rawNulls) { + partitionBitsInPlace( + rawNulls, partitions, numPartitions_, ctx, endPartitionOffsets_, pool_); + } + } +} + +VectorPtr PartitionedRowVector::partitionAt(uint32_t partition) const { + VELOX_CHECK_LT(partition, numPartitions_); + + vector_size_t beginOffset = + partition == 0 ? 0 : rawEndPartitionOffsets_[partition - 1]; + vector_size_t numRowsInPartition = + rawEndPartitionOffsets_[partition] - beginOffset; + + std::vector children; + children.reserve(partitionedChildren_.size()); + for (const auto& child : partitionedChildren_) { + children.push_back(child->partitionAt(partition)); + } + + BufferPtr nulls = nullptr; + if (numRowsInPartition > 0 && vector_->rawNulls()) { + nulls = AlignedBuffer::allocate(numRowsInPartition, pool_); + bits::copyBits( + vector_->rawNulls(), + beginOffset, + nulls->asMutable(), + 0, + numRowsInPartition); + } + + return std::make_shared( + pool_, + vector_->type(), + std::move(nulls), + numRowsInPartition, + std::move(children)); +} + } // namespace facebook::velox diff --git a/velox/vector/PartitionedVector.h b/velox/vector/PartitionedVector.h index 8c0983813e9..2d7d67adda8 100644 --- a/velox/vector/PartitionedVector.h +++ b/velox/vector/PartitionedVector.h @@ -241,4 +241,31 @@ class PartitionedFlatVector : public PartitionedVector { } }; +/// Partitions a RowVector in-place so that rows belonging to the same +/// partition occupy a contiguous range. Recursively partitions each child +/// column using PartitionedVector. +class PartitionedRowVector : public PartitionedVector { + public: + PartitionedRowVector( + const VectorPtr& rowVector, + uint32_t numPartitions, + const BufferPtr& partitionOffsets, + velox::memory::MemoryPool* pool) + : PartitionedVector(rowVector, numPartitions, partitionOffsets, pool) {} + + void partition( + const std::vector& partitions, + PartitionBuildContext& ctx) override; + + VectorPtr partitionAt(uint32_t partition) const override; + + const vector_size_t* rawSizes() override { + VELOX_UNREACHABLE("PartitionedRowVector does not implement rawSizes()"); + } + + private: + /// Partitioned child columns, one per child of the underlying RowVector. + std::vector partitionedChildren_; +}; + } // namespace facebook::velox diff --git a/velox/vector/tests/PartitionedVectorTest.cpp b/velox/vector/tests/PartitionedVectorTest.cpp index df5b586ec6a..19043a3145c 100644 --- a/velox/vector/tests/PartitionedVectorTest.cpp +++ b/velox/vector/tests/PartitionedVectorTest.cpp @@ -159,6 +159,48 @@ TEST_P(PartitioningVectorTest, testFlatVector) { testVectorPartitioning(makeAllNullFlatVector(numValues)); } +TEST_P(PartitioningVectorTest, testRowVector) { + const int numValues = GetParam(); + + // Two flat columns, no nulls at any level. + testVectorPartitioning(makeRowVector({ + makeFlatVector(numValues, [](auto row) { return row; }), + makeFlatVector(numValues, [](auto row) { return row * 10; }), + })); + + // Two flat columns with nullable children. + testVectorPartitioning(makeRowVector({ + makeFlatVector( + numValues, [](auto row) { return row; }, nullEvery(2)), + makeFlatVector( + numValues, [](auto row) { return row * 10; }, nullEvery(3)), + })); + + // Row-level nulls with no child nulls. + testVectorPartitioning(makeRowVector( + {makeFlatVector(numValues, [](auto row) { return row; })}, + nullEvery(2))); + + // Row-level nulls combined with nullable children. + testVectorPartitioning(makeRowVector( + {makeFlatVector( + numValues, [](auto row) { return row; }, nullEvery(3))}, + nullEvery(2))); + + // All rows null. + testVectorPartitioning(makeRowVector( + {makeFlatVector(numValues, [](auto row) { return row; })}, + [](auto /*row*/) { return true; })); + + // Nested RowVector. + testVectorPartitioning(makeRowVector({ + makeFlatVector(numValues, [](auto row) { return row; }), + makeRowVector({ + makeFlatVector(numValues, [](auto row) { return row; }), + }), + })); +} + // Test with different vector sizes, including edge cases like 0 and 1. INSTANTIATE_TEST_SUITE_P( FlatVectorSizes, From 86db93bd4c3bf7eab03da64144b5c82f7d767cbc Mon Sep 17 00:00:00 2001 From: Xin Zhang Date: Tue, 10 Mar 2026 12:09:19 +0000 Subject: [PATCH 03/10] refactor: Move initializeCursorPartitionOffsets into partitionFixedWidthValuesInPlace --- velox/vector/PartitionedVector.cpp | 31 +++++++++++++----------------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/velox/vector/PartitionedVector.cpp b/velox/vector/PartitionedVector.cpp index 0999e59b351..59a9131dfe1 100644 --- a/velox/vector/PartitionedVector.cpp +++ b/velox/vector/PartitionedVector.cpp @@ -87,26 +87,31 @@ void partitionFixedWidthValuesInPlace( T* values, const std::vector& partitions, uint32_t numPartitions, - vector_size_t* cursorPartitionOffsets, - const vector_size_t* endPartitionOffsets) { + const BufferPtr& endPartitionOffsets, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { VELOX_DCHECK_NOT_NULL(values); - VELOX_DCHECK_NOT_NULL(cursorPartitionOffsets); VELOX_DCHECK_NOT_NULL(endPartitionOffsets); + initializeCursorPartitionOffsets( + ctx.cursorPartitionOffsets, endPartitionOffsets, numPartitions, pool); + auto* rawCursorOffsets = + ctx.cursorPartitionOffsets->asMutable(); + const auto* rawEndOffsets = endPartitionOffsets->as(); for (auto currentPartition = 0; currentPartition < numPartitions; currentPartition++) { - vector_size_t& offset = cursorPartitionOffsets[currentPartition]; - vector_size_t endOffset = endPartitionOffsets[currentPartition]; + auto& offset = rawCursorOffsets[currentPartition]; + auto endOffset = rawEndOffsets[currentPartition]; while (offset < endOffset) { uint32_t targetPartition = partitions[offset]; while (targetPartition != currentPartition) { - auto destinationOffset = cursorPartitionOffsets[targetPartition]++; + auto destinationOffset = rawCursorOffsets[targetPartition]++; std::swap(values[destinationOffset], values[offset]); targetPartition = partitions[destinationOffset]; } - offset = ++cursorPartitionOffsets[currentPartition]; + offset = ++rawCursorOffsets[currentPartition]; } } } @@ -120,20 +125,10 @@ void partitionFixedWidthValues( PartitionBuildContext& ctx, velox::memory::MemoryPool* pool) { VELOX_DCHECK_NOT_NULL(inputBuffer); - VELOX_DCHECK_NOT_NULL(endPartitionOffsets); auto input = inputBuffer->asMutable(); - - initializeCursorPartitionOffsets( - ctx.cursorPartitionOffsets, endPartitionOffsets, numPartitions, pool); - - vector_size_t* rawCursorOffsets = - ctx.cursorPartitionOffsets->asMutable(); - const vector_size_t* rawEndOffsets = - endPartitionOffsets->asMutable(); - partitionFixedWidthValuesInPlace( - input, partitions, numPartitions, rawCursorOffsets, rawEndOffsets); + input, partitions, numPartitions, endPartitionOffsets, ctx, pool); } // Swap two bits between two bytes From 6dd3661c7afef52c42ed1c5ca83c9e57e21ec2b3 Mon Sep 17 00:00:00 2001 From: Xin Zhang Date: Tue, 10 Mar 2026 12:01:55 +0000 Subject: [PATCH 04/10] fix: Add bool specialization for partitionFixedWidthValues --- velox/vector/PartitionedVector.cpp | 66 ++++++++++++++------ velox/vector/tests/PartitionedVectorTest.cpp | 16 +++++ 2 files changed, 62 insertions(+), 20 deletions(-) diff --git a/velox/vector/PartitionedVector.cpp b/velox/vector/PartitionedVector.cpp index 59a9131dfe1..e5806620feb 100644 --- a/velox/vector/PartitionedVector.cpp +++ b/velox/vector/PartitionedVector.cpp @@ -116,21 +116,6 @@ void partitionFixedWidthValuesInPlace( } } -template -void partitionFixedWidthValues( - BufferPtr& inputBuffer, - const std::vector& partitions, - const BufferPtr& endPartitionOffsets, - uint32_t numPartitions, - PartitionBuildContext& ctx, - velox::memory::MemoryPool* pool) { - VELOX_DCHECK_NOT_NULL(inputBuffer); - - auto input = inputBuffer->asMutable(); - partitionFixedWidthValuesInPlace( - input, partitions, numPartitions, endPartitionOffsets, ctx, pool); -} - // Swap two bits between two bytes void swapBit(Byte& byte1, BitIndex bit1, Byte& byte2, BitIndex bit2) { // Calculate the difference between the bits @@ -151,9 +136,9 @@ void partitionBitsInPlace( initializeCursorPartitionOffsets( ctx.cursorPartitionOffsets, endPartitionOffsets, numPartitions, pool); - auto rawCursorOffsets = + auto* rawCursorOffsets = ctx.cursorPartitionOffsets->asMutable(); - auto rawEndOffsets = endPartitionOffsets->asMutable(); + const auto* rawEndOffsets = endPartitionOffsets->as(); for (uint32_t partition = 0; partition < numPartitions; partition++) { auto& offset = rawCursorOffsets[partition]; @@ -184,6 +169,36 @@ void partitionBitsInPlace( } } +template +void partitionFixedWidthValues( + BufferPtr& inputBuffer, + const std::vector& partitions, + const BufferPtr& endPartitionOffsets, + uint32_t numPartitions, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { + VELOX_DCHECK_NOT_NULL(inputBuffer); + + auto input = inputBuffer->asMutable(); + partitionFixedWidthValuesInPlace( + input, partitions, numPartitions, endPartitionOffsets, ctx, pool); +} + +template <> +void partitionFixedWidthValues( + BufferPtr& inputBuffer, + const std::vector& partitions, + const BufferPtr& endPartitionOffsets, + uint32_t numPartitions, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { + VELOX_DCHECK_NOT_NULL(inputBuffer); + + auto input = inputBuffer->asMutable(); + partitionBitsInPlace( + input, partitions, numPartitions, ctx, endPartitionOffsets, pool); +} + template PartitionedVectorPtr createPartitionedFlatVector( VectorPtr vector, @@ -366,15 +381,26 @@ void PartitionedRowVector::partition( partitionedChildren_.reserve(rowVector->childrenSize()); for (const auto& child : rowVector->children()) { - partitionedChildren_.push_back(PartitionedVector::create( - child, partitions, numPartitions_, endPartitionOffsets_, ctx, pool_)); + partitionedChildren_.push_back( + PartitionedVector::create( + child, + partitions, + numPartitions_, + endPartitionOffsets_, + ctx, + pool_)); } if (numPartitions_ > 1) { Byte* rawNulls = reinterpret_cast(vector_->mutableRawNulls()); if (rawNulls) { partitionBitsInPlace( - rawNulls, partitions, numPartitions_, ctx, endPartitionOffsets_, pool_); + rawNulls, + partitions, + numPartitions_, + ctx, + endPartitionOffsets_, + pool_); } } } diff --git a/velox/vector/tests/PartitionedVectorTest.cpp b/velox/vector/tests/PartitionedVectorTest.cpp index 19043a3145c..f35f42e0218 100644 --- a/velox/vector/tests/PartitionedVectorTest.cpp +++ b/velox/vector/tests/PartitionedVectorTest.cpp @@ -159,6 +159,22 @@ TEST_P(PartitioningVectorTest, testFlatVector) { testVectorPartitioning(makeAllNullFlatVector(numValues)); } +TEST_P(PartitioningVectorTest, testFlatBoolVector) { + const int numValues = GetParam(); + + // Random values, no nulls + testVectorPartitioning( + makeFlatVector(numValues, [](auto row) { return row % 2 == 0; })); + + // Random values, with half number of nulls + testVectorPartitioning( + makeFlatVector( + numValues, [](auto row) { return row % 2 == 0; }, nullEvery(2, 1))); + + // All nulls + testVectorPartitioning(makeAllNullFlatVector(numValues)); +} + TEST_P(PartitioningVectorTest, testRowVector) { const int numValues = GetParam(); From 2706c1e80f9463bd4fdd805e296839f964437ca3 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Thu, 12 Mar 2026 06:19:10 -0700 Subject: [PATCH 05/10] fix: Avoid allocating null buffer when partitioning null-free vectors PartitionedFlatVector::partition() and PartitionedRowVector::partition() called mutableRawNulls() unconditionally. mutableRawNulls() allocates a null buffer if one does not exist, causing mayHaveNulls() to return true for every vector after partitioning, even when the original had no nulls. Fix both sites to check rawNulls() first and only call mutableRawNulls() when a null buffer already exists. Add noNullBufferAllocatedForNullFreeFlat and noNullBufferAllocatedForNullFreeRow tests to PartitionedVectorTest to cover this case. # Conflicts: # velox/vector/PartitionedVector.cpp --- velox/vector/PartitionedVector.cpp | 12 ++--- velox/vector/tests/PartitionedVectorTest.cpp | 51 ++++++++++++++++++++ 2 files changed, 56 insertions(+), 7 deletions(-) diff --git a/velox/vector/PartitionedVector.cpp b/velox/vector/PartitionedVector.cpp index e5806620feb..7713f8ecfd2 100644 --- a/velox/vector/PartitionedVector.cpp +++ b/velox/vector/PartitionedVector.cpp @@ -346,8 +346,8 @@ template void PartitionedFlatVector::partition( const std::vector& partitions, PartitionBuildContext& ctx) { - Byte* rawNulls = reinterpret_cast(vector_->mutableRawNulls()); - if (rawNulls) { + if (vector_->rawNulls()) { + Byte* rawNulls = reinterpret_cast(vector_->mutableRawNulls()); partitionBitsInPlace( rawNulls, partitions, numPartitions_, ctx, endPartitionOffsets_, pool_); } @@ -391,17 +391,15 @@ void PartitionedRowVector::partition( pool_)); } - if (numPartitions_ > 1) { + if (numPartitions_ > 1 && vector_->rawNulls()) { Byte* rawNulls = reinterpret_cast(vector_->mutableRawNulls()); - if (rawNulls) { - partitionBitsInPlace( - rawNulls, + partitionBitsInPlace( + rawNulls, partitions, numPartitions_, ctx, endPartitionOffsets_, pool_); - } } } diff --git a/velox/vector/tests/PartitionedVectorTest.cpp b/velox/vector/tests/PartitionedVectorTest.cpp index f35f42e0218..f87c9514d4e 100644 --- a/velox/vector/tests/PartitionedVectorTest.cpp +++ b/velox/vector/tests/PartitionedVectorTest.cpp @@ -217,6 +217,57 @@ TEST_P(PartitioningVectorTest, testRowVector) { })); } +// Partitioning a null-free vector must not allocate a null buffer. +TEST_P(PartitioningVectorTest, noNullBufferAllocatedForNullFreeFlat) { + const int numValues = GetParam(); + if (numValues == 0) { + return; + } + + auto flat = makeFlatVector(numValues, [](auto row) { return row; }); + ASSERT_FALSE(flat->mayHaveNulls()); + + std::vector partitions(numValues); + for (int i = 0; i < numValues; ++i) { + partitions[i] = i % 2; + } + + auto pv = PartitionedVector::create(flat, partitions, 2, ctx_, pool_.get()); + EXPECT_FALSE(pv->baseVector()->mayHaveNulls()) + << "partition() must not allocate a null buffer for a null-free FlatVector"; +} + +// Partitioning a null-free RowVector must not allocate null buffers on the +// row vector or any of its children. +TEST_P(PartitioningVectorTest, noNullBufferAllocatedForNullFreeRow) { + const int numValues = GetParam(); + if (numValues == 0) { + return; + } + + auto row = makeRowVector({ + makeFlatVector(numValues, [](auto row) { return row; }), + makeFlatVector(numValues, [](auto row) { return row * 10; }), + }); + ASSERT_FALSE(row->mayHaveNulls()); + ASSERT_FALSE(row->childAt(0)->mayHaveNulls()); + ASSERT_FALSE(row->childAt(1)->mayHaveNulls()); + + std::vector partitions(numValues); + for (int i = 0; i < numValues; ++i) { + partitions[i] = i % 2; + } + + auto pv = PartitionedVector::create(row, partitions, 2, ctx_, pool_.get()); + auto* base = pv->baseVector()->as(); + EXPECT_FALSE(base->mayHaveNulls()) + << "partition() must not allocate a null buffer for a null-free RowVector"; + EXPECT_FALSE(base->childAt(0)->mayHaveNulls()) + << "partition() must not allocate a null buffer for null-free child 0"; + EXPECT_FALSE(base->childAt(1)->mayHaveNulls()) + << "partition() must not allocate a null buffer for null-free child 1"; +} + // Test with different vector sizes, including edge cases like 0 and 1. INSTANTIATE_TEST_SUITE_P( FlatVectorSizes, From fe85841198d5fb2c4309a2c39d3bc9c3a14b49fb Mon Sep 17 00:00:00 2001 From: Xin Zhang Date: Fri, 13 Mar 2026 11:18:57 +0000 Subject: [PATCH 06/10] feat: Add ParitionedConstantVector implementation --- velox/vector/PartitionedVector.cpp | 28 +++++++++++++++----- velox/vector/PartitionedVector.h | 27 +++++++++++++++++++ velox/vector/tests/PartitionedVectorTest.cpp | 11 ++++++++ 3 files changed, 59 insertions(+), 7 deletions(-) diff --git a/velox/vector/PartitionedVector.cpp b/velox/vector/PartitionedVector.cpp index 7713f8ecfd2..1bee4029a66 100644 --- a/velox/vector/PartitionedVector.cpp +++ b/velox/vector/PartitionedVector.cpp @@ -307,12 +307,16 @@ PartitionedVectorPtr PartitionedVector::create( vector, partitions, numPartitions, endPartitionOffsets, ctx, pool); } + case VectorEncoding::Simple::CONSTANT: { + return std::make_shared( + vector, numPartitions, endPartitionOffsets, pool); + } + case VectorEncoding::Simple::ARRAY: case VectorEncoding::Simple::MAP: case VectorEncoding::Simple::DICTIONARY: case VectorEncoding::Simple::BIASED: case VectorEncoding::Simple::SEQUENCE: - case VectorEncoding::Simple::CONSTANT: case VectorEncoding::Simple::LAZY: VELOX_UNSUPPORTED( "Unsupported vector encoding for PartitionedVector: {}", @@ -394,12 +398,7 @@ void PartitionedRowVector::partition( if (numPartitions_ > 1 && vector_->rawNulls()) { Byte* rawNulls = reinterpret_cast(vector_->mutableRawNulls()); partitionBitsInPlace( - rawNulls, - partitions, - numPartitions_, - ctx, - endPartitionOffsets_, - pool_); + rawNulls, partitions, numPartitions_, ctx, endPartitionOffsets_, pool_); } } @@ -436,4 +435,19 @@ VectorPtr PartitionedRowVector::partitionAt(uint32_t partition) const { std::move(children)); } +void PartitionedConstantVector::partition( + const std::vector& /*partitions*/, + PartitionBuildContext& /*ctx*/) {} + +VectorPtr PartitionedConstantVector::partitionAt(uint32_t partition) const { + VELOX_CHECK_LT(partition, numPartitions_); + + const vector_size_t beginOffset = + partition == 0 ? 0 : rawEndPartitionOffsets_[partition - 1]; + const vector_size_t numRowsInPartition = + rawEndPartitionOffsets_[partition] - beginOffset; + + return vector_->slice(0, numRowsInPartition); +} + } // namespace facebook::velox diff --git a/velox/vector/PartitionedVector.h b/velox/vector/PartitionedVector.h index 2d7d67adda8..c1c417e92a6 100644 --- a/velox/vector/PartitionedVector.h +++ b/velox/vector/PartitionedVector.h @@ -268,4 +268,31 @@ class PartitionedRowVector : public PartitionedVector { std::vector partitionedChildren_; }; +/// Partitions a ConstantVector by reusing the same constant payload and +/// returning constant slices sized to each partition. +class PartitionedConstantVector : public PartitionedVector { + public: + PartitionedConstantVector( + const VectorPtr& constantVector, + uint32_t numPartitions, + const BufferPtr& partitionOffsets, + velox::memory::MemoryPool* pool) + : PartitionedVector( + constantVector, + numPartitions, + partitionOffsets, + pool) {} + + void partition( + const std::vector& partitions, + PartitionBuildContext& ctx) override; + + VectorPtr partitionAt(uint32_t partition) const override; + + const vector_size_t* rawSizes() override { + VELOX_UNREACHABLE( + "PartitionedConstantVector does not implement rawSizes()"); + } +}; + } // namespace facebook::velox diff --git a/velox/vector/tests/PartitionedVectorTest.cpp b/velox/vector/tests/PartitionedVectorTest.cpp index f87c9514d4e..4a16f5130ba 100644 --- a/velox/vector/tests/PartitionedVectorTest.cpp +++ b/velox/vector/tests/PartitionedVectorTest.cpp @@ -217,6 +217,17 @@ TEST_P(PartitioningVectorTest, testRowVector) { })); } +TEST_P(PartitioningVectorTest, testConstantVector) { + const int numValues = GetParam(); + + testVectorPartitioning(makeConstant(7, numValues)); + testVectorPartitioning(makeConstant(std::nullopt, numValues)); + testVectorPartitioning(makeConstantRow( + ROW({"c0", "c1"}, {INTEGER(), VARCHAR()}), + variant::row({variant(11), variant("constant")}), + numValues)); +} + // Partitioning a null-free vector must not allocate a null buffer. TEST_P(PartitioningVectorTest, noNullBufferAllocatedForNullFreeFlat) { const int numValues = GetParam(); From 55d3fc446af872dfc1118eb3fc07037fb46af50b Mon Sep 17 00:00:00 2001 From: Xin Zhang Date: Wed, 4 Mar 2026 10:19:15 +0000 Subject: [PATCH 07/10] Add PartitionedVector benchmark --- velox/vector/benchmarks/CMakeLists.txt | 10 + .../benchmarks/PartitionedVectorBenchmark.cpp | 184 ++++++++++++++++++ 2 files changed, 194 insertions(+) create mode 100644 velox/vector/benchmarks/PartitionedVectorBenchmark.cpp diff --git a/velox/vector/benchmarks/CMakeLists.txt b/velox/vector/benchmarks/CMakeLists.txt index 0cb3c78bfd8..8c1840daa1b 100644 --- a/velox/vector/benchmarks/CMakeLists.txt +++ b/velox/vector/benchmarks/CMakeLists.txt @@ -45,3 +45,13 @@ target_link_libraries( gflags::gflags glog::glog ) + +add_executable(velox_vector_partitioned_vector_benchmark PartitionedVectorBenchmark.cpp) +target_link_libraries( + velox_vector_partitioned_vector_benchmark + velox_dwio_common_test_utils + velox_vector + velox_vector_test_lib + Folly::folly + Folly::follybenchmark +) diff --git a/velox/vector/benchmarks/PartitionedVectorBenchmark.cpp b/velox/vector/benchmarks/PartitionedVectorBenchmark.cpp new file mode 100644 index 00000000000..681a2e0c188 --- /dev/null +++ b/velox/vector/benchmarks/PartitionedVectorBenchmark.cpp @@ -0,0 +1,184 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include + +#include "dwio/common/tests/utils/BatchMaker.h" +#include "vector/PartitionedVector.h" + +using namespace facebook::velox; +using namespace facebook::velox::test; + +namespace facebook::velox::test { + +namespace { + +thread_local auto gen = std::mt19937(42); + +auto noNulls = [](vector_size_t) { return false; }; + +auto allNulls = [](vector_size_t) { return true; }; + +auto halfNulls = [](vector_size_t row) { return row % 2 == 0; }; + +template +RowTypePtr scalarTypeGenerator(int32_t numColumns) { + return ROW(std::vector(numColumns, createScalarType())); +} + +RowTypePtr dateTypeGenerator(int32_t numColumns) { + return ROW(std::vector(numColumns, DATE())); +} + +RowTypePtr shortDecimalTypeGenerator(int32_t numColumns) { + return ROW(std::vector(numColumns, DECIMAL(10, 2))); +} + +RowTypePtr longDecimalTypeGenerator(int32_t numColumns) { + return ROW(std::vector(numColumns, DECIMAL(20, 3))); +} + +RowTypePtr mixedFlatTypeGenerator(int32_t numColumns) { + const std::vector typeSelection = { + BOOLEAN(), + TINYINT(), + SMALLINT(), + INTEGER(), + BIGINT(), + HUGEINT(), + REAL(), + DOUBLE(), + TIMESTAMP(), + DATE(), + DECIMAL(10, 2), + DECIMAL(20, 3), + }; + + std::vector types; + types.reserve(numColumns); + + for (int i = 0; i < numColumns; ++i) { + types.push_back(typeSelection[i % typeSelection.size()]); + } + + std::ranges::shuffle(types, gen); + + return ROW(std::move(types)); +} + +auto randomPartitionFunction = [](const RowVectorPtr& vector, + uint32_t numPartitions, + std::vector& partitions) { + partitions.resize(vector->size()); + for (int i = 0; i < vector->size(); ++i) { + partitions[i] = gen() % numPartitions; + } +}; + +std::shared_ptr pool; +std::vector partitions; + +RowVectorPtr createTestVector( + const std::function& rowTypeGenerator, + vector_size_t numRows, + int32_t numColumns, + const std::function& isNullAt) { + auto rowType = rowTypeGenerator(numColumns); + const auto batch = BatchMaker::createBatch(rowType, numRows, *pool, isNullAt); + return std::static_pointer_cast(batch); +} + +} // namespace + +void runBM( + uint32_t iterations, + const std::function& rowTypeGenerator, + int32_t numColumns, + uint32_t numPartitions, + const std::function& isNullAt = noNulls, + vector_size_t numRows = 10000) { + folly::BenchmarkSuspender suspender; + PartitionBuildContext ctx; + auto vector = + createTestVector(rowTypeGenerator, numRows, numColumns, isNullAt); + randomPartitionFunction(vector, numPartitions, partitions); + for (uint32_t i = 0; i < iterations; ++i) { + // PartitionedVector::create mutates its input, so each iteration needs a + // fresh copy to keep inputs consistent. + const auto vectorCopy = std::static_pointer_cast( + BaseVector::copy(*vector, pool.get())); + suspender.dismiss(); + PartitionedVector::create( + vectorCopy, partitions, numPartitions, ctx, pool.get()); + suspender.rehire(); + } +} + +#define BENCHMARK_CONFIG(name, generator, numCols, nulls, numParts) \ + BENCHMARK_NAMED_PARAM( \ + runBM, \ + name##_##numCols##Cols_##nulls##_P##numParts, \ + generator, \ + numCols, \ + numParts, \ + nulls); + +#define BENCHMARK_PARTITIONS(name, generator, numCols, nulls) \ + BENCHMARK_CONFIG(name, generator, numCols, nulls, 4) \ + BENCHMARK_CONFIG(name, generator, numCols, nulls, 16) \ + BENCHMARK_CONFIG(name, generator, numCols, nulls, 64) \ + BENCHMARK_CONFIG(name, generator, numCols, nulls, 256) \ + BENCHMARK_CONFIG(name, generator, numCols, nulls, 1024) + +#define BENCHMARK_SIZES(name, generator, nulls) \ + BENCHMARK_PARTITIONS(name, generator, 1, nulls) \ + BENCHMARK_PARTITIONS(name, generator, 10, nulls) \ + BENCHMARK_PARTITIONS(name, generator, 100, nulls) \ + BENCHMARK_PARTITIONS(name, generator, 1000, nulls) + +#define BENCHMARK_TYPE(name, generator) \ + BENCHMARK_SIZES(name, generator, noNulls) \ + BENCHMARK_SIZES(name, generator, allNulls) \ + BENCHMARK_SIZES(name, generator, halfNulls) + +BENCHMARK_TYPE(BOOLEAN, scalarTypeGenerator); +BENCHMARK_TYPE(SMALLINT, scalarTypeGenerator); +BENCHMARK_TYPE(INTEGER, scalarTypeGenerator); +BENCHMARK_TYPE(BIGINT, scalarTypeGenerator); +BENCHMARK_TYPE(HUGEINT, scalarTypeGenerator); +BENCHMARK_TYPE(REAL, scalarTypeGenerator); +BENCHMARK_TYPE(DOUBLE, scalarTypeGenerator); +BENCHMARK_TYPE(TIMESTAMP, scalarTypeGenerator); +BENCHMARK_TYPE(VARCHAR, scalarTypeGenerator); +BENCHMARK_TYPE(VARBINARY, scalarTypeGenerator); +BENCHMARK_TYPE(DATE, dateTypeGenerator); +BENCHMARK_TYPE(ShortDecimal, shortDecimalTypeGenerator); +BENCHMARK_TYPE(LongDecimal, longDecimalTypeGenerator); +BENCHMARK_TYPE(Mixed, mixedFlatTypeGenerator); + +} // namespace facebook::velox::test + +int main(int argc, char** argv) { + folly::Init init{&argc, &argv}; + memory::MemoryManager::initialize(memory::MemoryManager::Options{}); + pool = memory::memoryManager()->addLeafPool(); + folly::runBenchmarks(); + return 0; +} From 70c3fd882b101c165454f5aeb15320fb6c7aa11f Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Fri, 20 Mar 2026 05:32:26 -0700 Subject: [PATCH 08/10] feat(PartitionedOutput): Add numNullsPerPartition_ to PartitionedVector --- velox/vector/PartitionedVector.cpp | 30 ++++- velox/vector/PartitionedVector.h | 16 +++ velox/vector/tests/PartitionedVectorTest.cpp | 130 ++++++++++++++++++- 3 files changed, 172 insertions(+), 4 deletions(-) diff --git a/velox/vector/PartitionedVector.cpp b/velox/vector/PartitionedVector.cpp index 1bee4029a66..bc83840aa9c 100644 --- a/velox/vector/PartitionedVector.cpp +++ b/velox/vector/PartitionedVector.cpp @@ -214,9 +214,9 @@ PartitionedVectorPtr createPartitionedFlatVector( auto partitionedFlatVector = std::make_shared>( flatVector, numPartitions, endPartitionOffsets, pool); - if (numPartitions > 1) { - partitionedFlatVector->partition(partitions, ctx); - } + // Always call partition() so that numNullsPerPartition_ is populated, + // even when numPartitions == 1 and no data movement is required. + partitionedFlatVector->partition(partitions, ctx); return partitionedFlatVector; } @@ -364,6 +364,18 @@ void PartitionedFlatVector::partition( numPartitions_, ctx, pool_); + + // Count nulls per partition from the now-partitioned null bitmap. + if (const uint64_t* rawNulls = vector_->rawNulls()) { + for (uint32_t p = 0; p < numPartitions_; ++p) { + const vector_size_t begin = p == 0 ? 0 : rawEndPartitionOffsets_[p - 1]; + const vector_size_t end = rawEndPartitionOffsets_[p]; + if (begin < end) { + numNullsPerPartition_[p] = + static_cast(bits::countNulls(rawNulls, begin, end)); + } + } + } } template @@ -400,6 +412,18 @@ void PartitionedRowVector::partition( partitionBitsInPlace( rawNulls, partitions, numPartitions_, ctx, endPartitionOffsets_, pool_); } + + // Count nulls per partition from the now-partitioned null bitmap. + if (const uint64_t* rawNulls = vector_->rawNulls()) { + for (uint32_t p = 0; p < numPartitions_; ++p) { + const vector_size_t begin = p == 0 ? 0 : rawEndPartitionOffsets_[p - 1]; + const vector_size_t end = rawEndPartitionOffsets_[p]; + if (begin < end) { + numNullsPerPartition_[p] = + static_cast(bits::countNulls(rawNulls, begin, end)); + } + } + } } VectorPtr PartitionedRowVector::partitionAt(uint32_t partition) const { diff --git a/velox/vector/PartitionedVector.h b/velox/vector/PartitionedVector.h index c1c417e92a6..eb008f1193b 100644 --- a/velox/vector/PartitionedVector.h +++ b/velox/vector/PartitionedVector.h @@ -149,6 +149,12 @@ class PartitionedVector { return dynamic_cast(this); } + /// Returns the number of null rows in the given partition. + vector_size_t numNullsAt(uint32_t partition) const { + VELOX_DCHECK_LT(partition, numPartitions_); + return numNullsPerPartition_[partition]; + } + TypeKind typeKind() const { return vector_->typeKind(); } @@ -181,6 +187,7 @@ class PartitionedVector { : vector_(vector), numPartitions_(numPartitions), endPartitionOffsets_(endPartitionOffsets), + numNullsPerPartition_(numPartitions, 0), pool_(pool) { VELOX_CHECK_NOT_NULL(vector_); VELOX_CHECK_GT(numPartitions_, 0); @@ -215,6 +222,9 @@ class PartitionedVector { // partitioning. vector_size_t* rawEndPartitionOffsets_; + /// Null row counts per partition, computed during partition(). + std::vector numNullsPerPartition_; + velox::memory::MemoryPool* pool_; }; @@ -259,6 +269,12 @@ class PartitionedRowVector : public PartitionedVector { VectorPtr partitionAt(uint32_t partition) const override; + /// Returns the partitioned child vector at the given column index. + PartitionedVectorPtr childAt(uint32_t col) const { + VELOX_DCHECK_LT(col, partitionedChildren_.size()); + return partitionedChildren_[col]; + } + const vector_size_t* rawSizes() override { VELOX_UNREACHABLE("PartitionedRowVector does not implement rawSizes()"); } diff --git a/velox/vector/tests/PartitionedVectorTest.cpp b/velox/vector/tests/PartitionedVectorTest.cpp index 4a16f5130ba..569a6e6ae9f 100644 --- a/velox/vector/tests/PartitionedVectorTest.cpp +++ b/velox/vector/tests/PartitionedVectorTest.cpp @@ -14,7 +14,6 @@ * limitations under the License. */ #include -#include #include #include @@ -279,6 +278,135 @@ TEST_P(PartitioningVectorTest, noNullBufferAllocatedForNullFreeRow) { << "partition() must not allocate a null buffer for null-free child 1"; } +// numNullsAt() tests +// --------------------------------------------------------------------------- + +// A null-free flat vector must report zero nulls for every partition. +TEST_P(PartitioningVectorTest, numNullsAtFlatNoNulls) { + const int numValues = GetParam(); + auto flat = makeFlatVector(numValues, [](auto row) { return row; }); + + std::vector partitions(numValues); + for (int i = 0; i < numValues; ++i) { + partitions[i] = i % 3; + } + auto pv = PartitionedVector::create(flat, partitions, 3, ctx_, pool_.get()); + for (uint32_t p = 0; p < 3; ++p) { + EXPECT_EQ(pv->numNullsAt(p), 0) << "partition " << p; + } +} + +// A flat vector with every other row null must report the exact per-partition +// null count. The sum across all partitions must equal the total null count. +TEST_P(PartitioningVectorTest, numNullsAtFlatSomeNulls) { + const int numValues = GetParam(); + auto flat = makeFlatVector( + numValues, [](auto row) { return row; }, nullEvery(2)); + + std::vector partitions(numValues); + for (int i = 0; i < numValues; ++i) { + partitions[i] = i % 3; + } + auto pv = PartitionedVector::create(flat, partitions, 3, ctx_, pool_.get()); + + // Per-partition counts must agree with manual bit-scan of the base vector. + const auto* rawNulls = pv->baseVector()->rawNulls(); + const auto* rawOffsets = pv->rawPartitionOffsets(); + for (uint32_t p = 0; p < 3; ++p) { + const vector_size_t begin = p == 0 ? 0 : rawOffsets[p - 1]; + const vector_size_t end = rawOffsets[p]; + const vector_size_t expected = rawNulls + ? BaseVector::countNulls(pv->baseVector()->nulls(), begin, end) + : 0; + EXPECT_EQ(pv->numNullsAt(p), expected) << "partition " << p; + } + + // Sum across partitions must equal the total null count in the source vector. + const vector_size_t total = + pv->numNullsAt(0) + pv->numNullsAt(1) + pv->numNullsAt(2); + EXPECT_EQ(total, BaseVector::countNulls(flat->nulls(), 0, numValues)); +} + +// An all-null flat vector must report numNullsAt(p) == rows in that partition. +TEST_P(PartitioningVectorTest, numNullsAtFlatAllNulls) { + const int numValues = GetParam(); + auto flat = makeAllNullFlatVector(numValues); + + std::vector partitions(numValues); + for (int i = 0; i < numValues; ++i) { + partitions[i] = i % 3; + } + auto pv = PartitionedVector::create(flat, partitions, 3, ctx_, pool_.get()); + + const auto* rawOffsets = pv->rawPartitionOffsets(); + for (uint32_t p = 0; p < 3; ++p) { + const vector_size_t begin = p == 0 ? 0 : rawOffsets[p - 1]; + const vector_size_t numRowsInPartition = rawOffsets[p] - begin; + EXPECT_EQ(pv->numNullsAt(p), numRowsInPartition) << "partition " << p; + } +} + +// A row vector with no row-level nulls must report zero per-partition nulls at +// the row level, even when child columns have nulls. +TEST_P(PartitioningVectorTest, numNullsAtRowNoRowLevelNulls) { + const int numValues = GetParam(); + auto row = makeRowVector({ + makeFlatVector( + numValues, [](auto row) { return row; }, nullEvery(2)), + }); + ASSERT_FALSE(row->mayHaveNulls()); + + std::vector partitions(numValues); + for (int i = 0; i < numValues; ++i) { + partitions[i] = i % 3; + } + auto pv = PartitionedVector::create(row, partitions, 3, ctx_, pool_.get()); + for (uint32_t p = 0; p < 3; ++p) { + EXPECT_EQ(pv->numNullsAt(p), 0) + << "Row-level numNullsAt() must not count child nulls, partition " << p; + } +} + +// A row vector with row-level nulls must report per-partition counts that match +// a manual bit-scan. Child null counts must be counted independently. +TEST_P(PartitioningVectorTest, numNullsAtRowRowLevelNulls) { + const int numValues = GetParam(); + auto row = makeRowVector( + {makeFlatVector( + numValues, [](auto row) { return row; }, nullEvery(3))}, + nullEvery(2)); + + std::vector partitions(numValues); + for (int i = 0; i < numValues; ++i) { + partitions[i] = i % 3; + } + auto pv = PartitionedVector::create(row, partitions, 3, ctx_, pool_.get()); + + const auto* rawOffsets = pv->rawPartitionOffsets(); + for (uint32_t p = 0; p < 3; ++p) { + const vector_size_t begin = p == 0 ? 0 : rawOffsets[p - 1]; + const vector_size_t end = rawOffsets[p]; + const vector_size_t expected = + BaseVector::countNulls(pv->baseVector()->nulls(), begin, end); + EXPECT_EQ(pv->numNullsAt(p), expected) + << "Row-level null count mismatch, partition " << p; + } + + // Child null counts must be tracked independently of row-level nulls. + auto* prv = dynamic_cast(pv.get()); + ASSERT_NE(prv, nullptr); + auto child = prv->childAt(0); + const auto* childOffsets = child->rawPartitionOffsets(); + for (uint32_t p = 0; p < 3; ++p) { + const vector_size_t begin = p == 0 ? 0 : childOffsets[p - 1]; + const vector_size_t end = childOffsets[p]; + const vector_size_t expected = + BaseVector::countNulls(child->baseVector()->nulls(), begin, end); + EXPECT_EQ(child->numNullsAt(p), expected) + << "Child null count mismatch, partition " << p; + } +} + // Test with different vector sizes, including edge cases like 0 and 1. INSTANTIATE_TEST_SUITE_P( FlatVectorSizes, From f68e4a131a46ca2a98f198137a79fade06bc9c07 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Mon, 23 Mar 2026 02:32:26 -0700 Subject: [PATCH 09/10] feat(PartitionedOutput): Add PrestoIterativePartitioningSerializer This commit introduces PrestoIterativePartitioningSerializer, which buffers RowVectors across multiple append() calls, partitions rows in-place using PartitionedVector, and on flush() serializes each non-empty partition into a Presto wire-format IOBuf. The serializer has no dependency on velox_exec: it returns raw folly::IOBuf objects, leaving SerializedPage creation to the caller. --- velox/serializers/CMakeLists.txt | 1 + .../PrestoIterativePartitioningSerializer.cpp | 731 ++++++++++++++++++ .../PrestoIterativePartitioningSerializer.h | 164 ++++ velox/serializers/benchmarks/CMakeLists.txt | 14 + ...erativePartitioningSerializerBenchmark.cpp | 177 +++++ velox/serializers/tests/CMakeLists.txt | 2 + ...stoIterativePartitioningSerializerTest.cpp | 707 +++++++++++++++++ 7 files changed, 1796 insertions(+) create mode 100644 velox/serializers/PrestoIterativePartitioningSerializer.cpp create mode 100644 velox/serializers/PrestoIterativePartitioningSerializer.h create mode 100644 velox/serializers/benchmarks/PrestoIterativePartitioningSerializerBenchmark.cpp create mode 100644 velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp diff --git a/velox/serializers/CMakeLists.txt b/velox/serializers/CMakeLists.txt index 9c6390cc730..966e8422881 100644 --- a/velox/serializers/CMakeLists.txt +++ b/velox/serializers/CMakeLists.txt @@ -29,6 +29,7 @@ velox_add_library( UnsafeRowSerializer.cpp PrestoBatchVectorSerializer.cpp PrestoHeader.cpp + PrestoIterativePartitioningSerializer.cpp PrestoIterativeVectorSerializer.cpp PrestoSerializerDeserializationUtils.cpp PrestoSerializerEstimationUtils.cpp diff --git a/velox/serializers/PrestoIterativePartitioningSerializer.cpp b/velox/serializers/PrestoIterativePartitioningSerializer.cpp new file mode 100644 index 00000000000..d14c1ea96ab --- /dev/null +++ b/velox/serializers/PrestoIterativePartitioningSerializer.cpp @@ -0,0 +1,731 @@ +/* + * Copyright (c) International Business Machines Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/serializers/PrestoIterativePartitioningSerializer.h" + +#include "velox/common/base/BitUtil.h" +#include "velox/type/Type.h" +#include "velox/vector/ComplexVector.h" +#include "velox/vector/FlatVector.h" + +namespace facebook::velox::serializer::presto { + +namespace { + +constexpr int8_t kCheckSumBitMask = 4; +constexpr int64_t kVectorSizeTypeSize{sizeof(vector_size_t)}; +// [numRows:4][codec:1] +constexpr int64_t kUncompressedSizeOffset{kVectorSizeTypeSize + 1}; +// [numRows:4][codec:1][uncompressedSize:4][compressedSize:4][checksum:8] +constexpr int64_t kHeaderSize{kUncompressedSizeOffset + 4 + 4 + 8}; + +static inline const std::string_view kByteArray{"BYTE_ARRAY"}; +static inline const std::string_view kShortArray{"SHORT_ARRAY"}; +static inline const std::string_view kIntArray{"INT_ARRAY"}; +static inline const std::string_view kLongArray{"LONG_ARRAY"}; +static inline const std::string_view kInt128Array{"INT128_ARRAY"}; +static inline const std::string_view kVariableWidth{"VARIABLE_WIDTH"}; +static inline const std::string_view kRow{"ROW"}; + +inline void writeInt32(OutputStream* out, int32_t value) { + out->write(reinterpret_cast(&value), sizeof(value)); +} + +inline void writeInt64(OutputStream* out, int64_t value) { + out->write(reinterpret_cast(&value), sizeof(value)); +} + +char getCodecMarker() { + char marker = 0; + marker |= kCheckSumBitMask; + return marker; +} + +std::string_view typeToEncodingName(const TypePtr& type) { + switch (type->kind()) { + case TypeKind::BOOLEAN: + case TypeKind::TINYINT: + return kByteArray; + case TypeKind::SMALLINT: + return kShortArray; + case TypeKind::INTEGER: + case TypeKind::REAL: + return kIntArray; + case TypeKind::BIGINT: + case TypeKind::DOUBLE: + case TypeKind::TIMESTAMP: + return kLongArray; + case TypeKind::HUGEINT: + return kInt128Array; + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: + return kVariableWidth; + case TypeKind::ROW: + return kRow; + default: + VELOX_FAIL("Unsupported type kind: {}", static_cast(type->kind())); + } +} + +/// Finalizes the Presto page CRC by mixing in the codec marker, row count, +/// and uncompressed size on top of the listener's accumulated data checksum. +int64_t computeChecksum( + PrestoOutputStreamListener& listener, + int8_t codecMarker, + int32_t numRows, + int32_t uncompressedSize) { + auto crc = listener.crc(); + crc.process_bytes(&codecMarker, 1); + crc.process_bytes(&numRows, 4); + crc.process_bytes(&uncompressedSize, 4); + return static_cast(crc.checksum()); +} + +/// Returns the serialized byte width of a fixed-width type, matching the +/// sizeof(T) used in flushFlatValues. +int32_t fixedTypeWidth(TypeKind kind) { + switch (kind) { + case TypeKind::BOOLEAN: + case TypeKind::TINYINT: + return 1; + case TypeKind::SMALLINT: + return 2; + case TypeKind::INTEGER: + case TypeKind::REAL: + return 4; + case TypeKind::BIGINT: + case TypeKind::DOUBLE: + return 8; + case TypeKind::TIMESTAMP: + case TypeKind::HUGEINT: + return 16; + default: + return 0; + } +} + +/// Returns the exact bytes for one fixed-width column in one partition. +int64_t +simpleColumnBytes(const TypePtr& colType, int64_t numRows, int64_t numNulls) { + const auto encodingName = typeToEncodingName(colType); + return 4 + static_cast(encodingName.size()) + // header + 4 + // rowCount + 1 + // nullFlag + (numNulls > 0 ? bits::nbytes(numRows) : 0) + // null bitmap + (numRows - numNulls) * fixedTypeWidth(colType->kind()); // values +} + +/// Returns per-partition exact byte counts for one column (all partitions). +/// Recurses into nested ROW columns. +/// +/// Byte layout per column type: +/// Fixed-width: simpleColumnBytes(colType, numRows, numNulls) +/// ROW: 7 (header) + 4 (numFields) +/// + sum(child sizes) +/// + 4 (numRows) + 4*(numRows+1) (offsets) + 1 (hasNulls) +/// + (rowNulls>0 ? bits::nbytes(numRows) : 0) +std::vector computeColumnFlushSizes( + const std::vector& columnVectors, + const TypePtr& colType, + const std::vector& nonEmptyPartitions, + const std::vector& rowsPerPartition, + uint32_t numPartitions) { + std::vector sizes(numPartitions, 0); + + // Compute per-partition null counts by summing across batches. + std::vector nullCounts(numPartitions, 0); + for (uint32_t p : nonEmptyPartitions) { + for (const auto& pv : columnVectors) { + nullCounts[p] += pv->numNullsAt(p); + } + } + + switch (colType->kind()) { + case TypeKind::BOOLEAN: + case TypeKind::TINYINT: + case TypeKind::SMALLINT: + case TypeKind::INTEGER: + case TypeKind::BIGINT: + case TypeKind::REAL: + case TypeKind::DOUBLE: + case TypeKind::HUGEINT: + for (uint32_t p : nonEmptyPartitions) { + sizes[p] = + simpleColumnBytes(colType, rowsPerPartition[p], nullCounts[p]); + } + break; + + case TypeKind::TIMESTAMP: + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: + case TypeKind::ARRAY: + case TypeKind::MAP: + VELOX_NYI( + "computeColumnFlushSizes: unsupported type kind {}", + TypeKindName::toName(colType->kind())); + + case TypeKind::ROW: { + const auto& rowSchema = colType->asRow(); + const int32_t numFields = static_cast(rowSchema.size()); + + // Fixed per-partition overhead: header(7) + numFields(4) + footer: + // numRows(4) + // + sequential offsets 4*(numRows+1) + hasNulls(1) + // + null bitmap for the ROW vector itself if any rows in this partition + // are null. + for (uint32_t p : nonEmptyPartitions) { + const int64_t numRows = rowsPerPartition[p]; + const int64_t rowNullBitmapBytes = + nullCounts[p] > 0 ? bits::nbytes(numRows) : 0; + sizes[p] = 7 + 4 + // "ROW" header + numFields + 4 + 4 * (numRows + 1) + 1 + // footer: numRows + offsets + hasNulls + rowNullBitmapBytes; + } + // Add child column sizes recursively. + for (uint32_t col = 0; col < static_cast(numFields); ++col) { + std::vector childVectors; + childVectors.reserve(columnVectors.size()); + for (const auto& pv : columnVectors) { + childVectors.push_back( + std::dynamic_pointer_cast(pv)->childAt( + col)); + } + const auto childSizes = computeColumnFlushSizes( + childVectors, + rowSchema.childAt(col), + nonEmptyPartitions, + rowsPerPartition, + numPartitions); + for (uint32_t p : nonEmptyPartitions) { + sizes[p] += childSizes[p]; + } + } + break; + } + + default: + VELOX_UNSUPPORTED( + "computeColumnFlushSizes: unsupported type kind {}", + TypeKindName::toName(colType->kind())); + } + return sizes; +} + +} // namespace + +PrestoIterativePartitioningSerializer::PrestoIterativePartitioningSerializer( + RowTypePtr inputType, + uint32_t numPartitions, + const SerdeOpts& opts, + memory::MemoryPool* pool) + : type_(std::move(inputType)), + numPartitions_(numPartitions), + opts_(opts), + pool_(pool), + rowsPerPartition_(numPartitions, 0) { + VELOX_CHECK_GT(numPartitions_, 0); + VELOX_CHECK_NOT_NULL(pool_); + + numColumns_ = type_->size(); +} + +void PrestoIterativePartitioningSerializer::append( + const RowVectorPtr& input, + const std::vector& partitions) { + VELOX_CHECK_NOT_NULL(input); + VELOX_CHECK_EQ( + input->size(), + partitions.size(), + "partitions.size() must equal input->size()"); + + if (input->size() == 0) { + return; + } + + PartitionBuildContext ctx; + auto partitionedRowVector = PartitionedVector::create( + std::static_pointer_cast(input), + partitions, + numPartitions_, + ctx, + pool_); + + const vector_size_t* partitionOffsets = + partitionedRowVector->rawPartitionOffsets(); + vector_size_t prevOffset = 0; + for (uint32_t p = 0; p < numPartitions_; ++p) { + rowsPerPartition_[p] += partitionOffsets[p] - prevOffset; + prevOffset = partitionOffsets[p]; + } + + partitionedRowVectors_.push_back(std::move(partitionedRowVector)); + + bytesBuffered_ += input->retainedSize(); + rowsBuffered_ += static_cast(input->size()); +} + +// --------------------------------------------------------------------------- +// Top-level flush +// --------------------------------------------------------------------------- + +std::map, vector_size_t>> +PrestoIterativePartitioningSerializer::flush() { + auto pages = + (opts_.compressionKind == common::CompressionKind::CompressionKind_NONE) + ? flushUncompressed() + : flushCompressed(); + + partitionedRowVectors_.clear(); + flushSizes_.clear(); + std::fill(rowsPerPartition_.begin(), rowsPerPartition_.end(), 0); + bytesBuffered_ = 0; + rowsBuffered_ = 0; + + return pages; +} + +std::map, vector_size_t>> +PrestoIterativePartitioningSerializer::flushUncompressed() { + if (partitionedRowVectors_.empty()) { + return {}; + } + + const char codecMask = getCodecMarker(); + + // 1. Determine non-empty partitions. + std::vector nonEmptyPartitions; + for (uint32_t p = 0; p < numPartitions_; ++p) { + if (rowsPerPartition_[p] > 0) { + nonEmptyPartitions.push_back(p); + } + } + + // 2. Pre-compute exact byte sizes per top-level column and partition. + const auto& rowSchema = type_->asRow(); + flushSizes_.assign(rowSchema.size(), std::vector(numPartitions_, 0)); + for (uint32_t col = 0; col < rowSchema.size(); ++col) { + std::vector columnVectors; + columnVectors.reserve(partitionedRowVectors_.size()); + for (const auto& pRowVector : partitionedRowVectors_) { + columnVectors.push_back( + std::dynamic_pointer_cast(pRowVector) + ->childAt(col)); + } + flushSizes_[col] = computeColumnFlushSizes( + columnVectors, + rowSchema.childAt(col), + nonEmptyPartitions, + rowsPerPartition_, + numPartitions_); + } + + // 3. Create output streams sized to the exact bytes each partition will need, + // so that the entire payload fits. This avoids multiple resizing and copying. + std::vector> listeners( + numPartitions_); + std::vector> outputStreams(numPartitions_); + std::vector rawOutputStreams(numPartitions_); + std::vector beginStreamPositions(numPartitions_); + + for (uint32_t p : nonEmptyPartitions) { + int64_t initialSize = kHeaderSize + 4; // page header + numCols + for (uint32_t col = 0; col < rowSchema.size(); ++col) { + initialSize += flushSizes_[col][p]; + } + listeners[p] = std::make_unique(); + outputStreams[p] = std::make_unique( + *pool_, listeners[p].get(), initialSize); + rawOutputStreams[p] = outputStreams[p].get(); + beginStreamPositions[p] = outputStreams[p]->tellp(); + + flushStart(*outputStreams[p], p, codecMask); + } + + // 4. Flush column data. + flushRowChildren( + partitionedRowVectors_, rowSchema, nonEmptyPartitions, rawOutputStreams); + + // 5. Finalize the page by seeking back to fill in sizes and CRC, and get the + // IOBuf and numOfRows from each stream. + std::map, vector_size_t>> + result; + for (uint32_t p : nonEmptyPartitions) { + flushFinish( + *outputStreams[p], + p, + beginStreamPositions[p], + codecMask, + *listeners[p]); + result[p] = + std::make_pair(outputStreams[p]->getIOBuf(), rowsPerPartition_[p]); + } + + return result; +} + +std::map, vector_size_t>> +PrestoIterativePartitioningSerializer::flushCompressed() { + VELOX_NYI(); +} + +// --------------------------------------------------------------------------- +// Second level functions: start, columns and finish +// --------------------------------------------------------------------------- + +void PrestoIterativePartitioningSerializer::flushStart( + IOBufOutputStream& out, + uint32_t partition, + char codecMask) const { + auto* listener = dynamic_cast(out.listener()); + if (listener) { + listener->pause(); + } + + // Write 21-byte Presto page header; sizes and CRC are filled in later. + const int32_t numRows = static_cast(rowsPerPartition_[partition]); + char header[kHeaderSize] = {}; + std::memcpy(&header[0], &numRows, 4); + std::memcpy(&header[4], &codecMask, 1); + out.write(header, kHeaderSize); + + if (listener) { + listener->resume(); + } + + // Number of columns is included in the CRC. + const int32_t numCols = static_cast(numColumns_); + out.write(reinterpret_cast(&numCols), 4); +} + +void PrestoIterativePartitioningSerializer::flushRowChildren( + const std::vector& partitionedVectors, + const RowType& rowSchema, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const { + for (uint32_t col = 0; col < rowSchema.size(); ++col) { + std::vector column; + column.reserve(partitionedVectors.size()); + for (const auto& partitionedVector : partitionedVectors) { + const auto& partitionedRowVector = + std::dynamic_pointer_cast(partitionedVector); + VELOX_DCHECK_NOT_NULL(partitionedRowVector.get()); + column.push_back(partitionedRowVector->childAt(col)); + } + + flushColumn( + column, rowSchema.childAt(col), nonEmptyPartitions, outputStreams); + } +} + +void PrestoIterativePartitioningSerializer::flushFinish( + IOBufOutputStream& out, + uint32_t partition, + std::streampos beginOffset, + char codecMask, + PrestoOutputStreamListener& listener) const { + listener.pause(); + + const std::streampos totalSize = + static_cast(out.tellp() - beginOffset); + const std::streampos uncompressedSize = totalSize - kHeaderSize; + const int64_t crc = computeChecksum( + listener, + static_cast(codecMask), + static_cast(rowsPerPartition_[partition]), + uncompressedSize); + + out.seekp(beginOffset + kUncompressedSizeOffset); + writeInt32(&out, uncompressedSize); + writeInt32(&out, uncompressedSize); // TODO: compressedSize + writeInt64(&out, crc); + out.seekp(beginOffset + totalSize); +} + +// --------------------------------------------------------------------------- +// Column-level dispatch +// --------------------------------------------------------------------------- + +void PrestoIterativePartitioningSerializer::flushColumn( + const std::vector& partitionedVectors, + const TypePtr& colType, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const { + VELOX_CHECK_GT(partitionedVectors.size(), 0); + + auto typeKind = partitionedVectors[0]->baseVector()->typeKind(); + switch (typeKind) { + case TypeKind::BOOLEAN: + case TypeKind::TINYINT: + case TypeKind::SMALLINT: + case TypeKind::INTEGER: + case TypeKind::BIGINT: + case TypeKind::REAL: + case TypeKind::DOUBLE: + case TypeKind::HUGEINT: + flushSimpleColumn( + partitionedVectors, colType, nonEmptyPartitions, outputStreams); + break; + + case TypeKind::TIMESTAMP: + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: + case TypeKind::ROW: + case TypeKind::ARRAY: + case TypeKind::MAP: + VELOX_NYI(); + + default: + VELOX_UNSUPPORTED( + "Invalid vector encoding for OptimizedPartitionedOutput: ", typeKind); + } +} + +void PrestoIterativePartitioningSerializer::flushSimpleColumn( + const std::vector& partitionedVectors, + const TypePtr& colType, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const { + flushHeader(typeToEncodingName(colType), nonEmptyPartitions, outputStreams); + flushRowCounts(nonEmptyPartitions, outputStreams); + flushNulls(partitionedVectors, nonEmptyPartitions, outputStreams); + + for (size_t i = 0; i < partitionedVectors.size(); i++) { + flushSingleSimpleVector(partitionedVectors[i], outputStreams); + } +} + +template +void PrestoIterativePartitioningSerializer::flushSingleFlatVector( + const PartitionedVectorPtr& partitionedVector, + const std::vector& outputStreams) const { + using T = typename TypeTraits::NativeType; + auto* flatVector = partitionedVector->as>(); + VELOX_DCHECK_NOT_NULL(flatVector); + + const auto* rawValues = + flatVector->baseVector()->template as>()->rawValues(); + const auto* rawNulls = flatVector->baseVector()->rawNulls(); + const auto* partitionOffsets = flatVector->rawPartitionOffsets(); + + flushFlatValues(rawValues, rawNulls, partitionOffsets, outputStreams); +} + +// BOOLEAN columns use kByteArray encoding: FlatVector stores bits +// packed, so rawValues() is unsupported. Each non-null value is written as +// one byte (0x00 or 0x01). +template <> +void PrestoIterativePartitioningSerializer::flushSingleFlatVector< + TypeKind::BOOLEAN>( + const PartitionedVectorPtr& partitionedVector, + const std::vector& outputStreams) const { + auto* flatVector = partitionedVector->as>(); + VELOX_DCHECK_NOT_NULL(flatVector); + + const auto* rawBoolValues = + flatVector->baseVector()->as>()->rawValues(); + const auto* rawNulls = flatVector->baseVector()->rawNulls(); + const auto* partitionOffsets = flatVector->rawPartitionOffsets(); + + // TODO: Improve performance + vector_size_t lastOffset = 0; + for (uint32_t p = 0; p < numPartitions_; ++p) { + const auto offset = partitionOffsets[p]; + const auto numValues = offset - lastOffset; + const auto numNulls = partitionedVector->numNullsAt(p); + if (outputStreams[p] != nullptr && numValues > 0) { + if (numNulls == 0) { + for (vector_size_t i = lastOffset; i < offset; ++i) { + const int8_t val = bits::isBitSet(rawBoolValues, i) ? 1 : 0; + outputStreams[p]->write(reinterpret_cast(&val), 1); + } + } else { + VELOX_DCHECK_NOT_NULL(rawNulls); + for (vector_size_t i = lastOffset; i < offset; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + const int8_t val = bits::isBitSet(rawBoolValues, i) ? 1 : 0; + outputStreams[p]->write(reinterpret_cast(&val), 1); + } + } + } + } + lastOffset = offset; + } +} + +void PrestoIterativePartitioningSerializer::flushSingleSimpleVector( + const PartitionedVectorPtr& partitionedVector, + const std::vector& outputStreams) const { + auto encoding = partitionedVector->baseVector()->encoding(); + auto typeKind = partitionedVector->baseVector()->typeKind(); + + switch (encoding) { + case VectorEncoding::Simple::FLAT: + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + flushSingleFlatVector, typeKind, partitionedVector, outputStreams); + break; + case VectorEncoding::Simple::BIASED: + case VectorEncoding::Simple::CONSTANT: + case VectorEncoding::Simple::DICTIONARY: + case VectorEncoding::Simple::SEQUENCE: + VELOX_NYI( + "Unsupported vector encoding for OptimizedPartitionedOutput: ", + encoding); + default: + VELOX_UNSUPPORTED( + "Invalid vector encoding for OptimizedPartitionedOutput:flushSingleSimpleVector ", + encoding); + } +} + +// --------------------------------------------------------------------------- +// Column building blocks +// --------------------------------------------------------------------------- + +void PrestoIterativePartitioningSerializer::flushHeader( + std::string_view name, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const { + const int32_t nameLen = static_cast(name.size()); + for (uint32_t p : nonEmptyPartitions) { + writeInt32(outputStreams[p], nameLen); + outputStreams[p]->write(name.data(), nameLen); + } +} + +void PrestoIterativePartitioningSerializer::flushRowCounts( + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const { + for (uint32_t p : nonEmptyPartitions) { + writeInt32(outputStreams[p], static_cast(rowsPerPartition_[p])); + } +} + +void PrestoIterativePartitioningSerializer::flushNulls( + const std::vector& partitionedVectors, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const { + std::vector nullCounts(numPartitions_, 0); + for (uint32_t p : nonEmptyPartitions) { + for (const auto& pv : partitionedVectors) { + nullCounts[p] += pv->numNullsAt(p); + } + const char flagByte = nullCounts[p] > 0 ? 1 : 0; + outputStreams[p]->write(&flagByte, 1); + } + + const bool hasAnyNulls = std::any_of( + nonEmptyPartitions.begin(), nonEmptyPartitions.end(), [&](uint32_t p) { + return nullCounts[p] > 0; + }); + if (!hasAnyNulls) { + return; + } + + // Build each partition's null bitmap in a temporary buffer, accumulating + // bits across all batches. Writing via write() correctly handles range + // boundaries in the output stream without requiring seekp(). + // TODO: Avoid this extra memory allocation and copy + std::vector> bitmaps(numPartitions_); + for (uint32_t p : nonEmptyPartitions) { + if (nullCounts[p] > 0) { + bitmaps[p].assign(bits::nbytes(rowsPerPartition_[p]), 0); + } + } + + std::vector destBitOffsets(numPartitions_, 0); + for (const auto& pv : partitionedVectors) { + const uint64_t* rawNulls = pv->baseVector()->rawNulls(); + const auto* partitionOffsets = pv->rawPartitionOffsets(); + + vector_size_t startBit = 0; + for (uint32_t p : nonEmptyPartitions) { + const vector_size_t numBits = partitionOffsets[p] - startBit; + if (rawNulls && numBits > 0 && !bitmaps[p].empty()) { + bits::copyBits( + rawNulls, + startBit, + reinterpret_cast(bitmaps[p].data()), + destBitOffsets[p], + numBits); + } + if (!bitmaps[p].empty()) { + destBitOffsets[p] += numBits; + } + startBit = partitionOffsets[p]; + } + } + + for (uint32_t p : nonEmptyPartitions) { + if (nullCounts[p] == 0) { + continue; + } + + // Convert Velox format (LSB-first, 1=not-null) to Presto wire format + // (MSB-first, 1=null) in-place. + const int32_t numBytes = bits::nbytes(rowsPerPartition_[p]); + for (int32_t i = 0; i < numBytes; ++i) { + bitmaps[p][i] = ~bitmaps[p][i]; + bits::reverseBits(&bitmaps[p][i], 1); + } + + outputStreams[p]->write( + reinterpret_cast(bitmaps[p].data()), numBytes); + } +} + +template +void PrestoIterativePartitioningSerializer::flushFlatValues( + const T* partitionedValues, + const uint64_t* rawNulls, + const vector_size_t* partitionOffsets, + const std::vector& outputStreams) const { + const auto typeWidth = sizeof(T); + vector_size_t lastOffset = 0; + for (uint32_t p = 0; p < numPartitions_; ++p) { + const auto offset = partitionOffsets[p]; + const auto numValues = offset - lastOffset; + if (outputStreams[p] != nullptr && numValues > 0) { + if (!rawNulls) { + outputStreams[p]->write( + reinterpret_cast(&partitionedValues[lastOffset]), + numValues * typeWidth); + } else { + // Presto writes only non-null values; null slots are omitted. + // TODO: Improve performance + for (vector_size_t i = lastOffset; i < offset; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + outputStreams[p]->write( + reinterpret_cast(&partitionedValues[i]), + typeWidth); + } + } + } + } + lastOffset = offset; + } +} + +void PrestoIterativePartitioningSerializer::flushSequentialOffsets( + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const { + for (uint32_t p : nonEmptyPartitions) { + const int32_t numRows = static_cast(rowsPerPartition_[p]); + for (int32_t i = 0; i <= numRows; ++i) { + writeInt32(outputStreams[p], i); + } + } +} + +} // namespace facebook::velox::serializer::presto diff --git a/velox/serializers/PrestoIterativePartitioningSerializer.h b/velox/serializers/PrestoIterativePartitioningSerializer.h new file mode 100644 index 00000000000..b9e41286ea6 --- /dev/null +++ b/velox/serializers/PrestoIterativePartitioningSerializer.h @@ -0,0 +1,164 @@ +/* + * Copyright (c) International Business Machines Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include + +#include "velox/common/memory/ByteStream.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/type/Type.h" +#include "velox/vector/PartitionedVector.h" + +namespace facebook::velox::serializer::presto { + +/// Convenience alias matching PrestoSerializer.cpp convention. +using SerdeOpts = PrestoVectorSerde::PrestoOptions; + +/// Serializes a stream of RowVectors into per-partition Presto pages. +/// +/// Each call to append() routes rows to their assigned partition. flush() +/// produces one Presto-format IOBuf per non-empty partition and resets the +/// internal state so the serializer can be reused for the next cycle. +class PrestoIterativePartitioningSerializer { + public: + PrestoIterativePartitioningSerializer( + RowTypePtr inputType, + uint32_t numPartitions, + const SerdeOpts& opts, + memory::MemoryPool* pool); + + /// Routes each row in `input` to the partition indicated by `partitions`. + /// `partitions.size()` must equal `input->size()`. + void append( + const RowVectorPtr& input, + const std::vector& partitions); + + /// Serializes all buffered data into one Presto page per non-empty partition + /// and resets internal state. Returns an empty map if nothing has been + /// appended since the last flush. + std::map, vector_size_t>> + flush(); + + /// Returns the total retained bytes of all appended input vectors. + int64_t bytesBuffered() const { + return bytesBuffered_; + } + + /// Returns the total number of rows appended since the last flush. + int64_t rowsBuffered() const { + return rowsBuffered_; + } + + /// Returns the number of rows buffered for the given partition. + /// Must be called before flush(), which resets per-partition counts. + int64_t rowsPerPartition(uint32_t partition) const { + VELOX_DCHECK_LT(partition, numPartitions_); + return rowsPerPartition_[partition]; + } + + private: + std::map, vector_size_t>> + flushUncompressed(); + std::map, vector_size_t>> + flushCompressed(); + + void flushStart(IOBufOutputStream& out, uint32_t partition, char codecMask) + const; + + void flushFinish( + IOBufOutputStream& out, + uint32_t partition, + std::streampos beginOffset, + char codecMask, + PrestoOutputStreamListener& listener) const; + + void flushRowChildren( + const std::vector& partitionedVectors, + const RowType& rowSchema, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const; + + void flushColumn( + const std::vector& partitionedVectors, + const TypePtr& colType, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const; + + void flushSimpleColumn( + const std::vector& partitionedVectors, + const TypePtr& colType, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const; + + void flushSingleSimpleVector( + const PartitionedVectorPtr& partitionedVector, + const std::vector& outputStreams) const; + + template + void flushSingleFlatVector( + const PartitionedVectorPtr& partitionedVector, + const std::vector& outputStreams) const; + + void flushHeader( + std::string_view name, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const; + + void flushRowCounts( + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const; + + void flushNulls( + const std::vector& partitionedVectors, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const; + + template + void flushFlatValues( + const T* partitionedValues, + const uint64_t* rawNulls, + const vector_size_t* partitionOffsets, + const std::vector& outputStreams) const; + + void flushSequentialOffsets( + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const; + + RowTypePtr type_; + uint32_t numPartitions_; + SerdeOpts opts_; + memory::MemoryPool* pool_; + + /// Cumulative row count per partition across all appended batches. + std::vector rowsPerPartition_; + + /// Number of top-level columns in `type_`. + uint32_t numColumns_{0}; + + std::vector partitionedRowVectors_; + + int64_t bytesBuffered_{0}; + int64_t rowsBuffered_{0}; + + /// Per-column, per-partition exact byte counts computed during flush. + std::vector> flushSizes_; +}; + +} // namespace facebook::velox::serializer::presto diff --git a/velox/serializers/benchmarks/CMakeLists.txt b/velox/serializers/benchmarks/CMakeLists.txt index 7d1044e4367..a81530595e8 100644 --- a/velox/serializers/benchmarks/CMakeLists.txt +++ b/velox/serializers/benchmarks/CMakeLists.txt @@ -21,3 +21,17 @@ target_link_libraries( Folly::folly Folly::follybenchmark ) + +add_executable( + velox_presto_iterative_partitioning_serializer_benchmark + PrestoIterativePartitioningSerializerBenchmark.cpp +) + +target_link_libraries( + velox_presto_iterative_partitioning_serializer_benchmark + velox_presto_serializer + velox_vector_test_lib + velox_memory + Folly::folly + Folly::follybenchmark +) diff --git a/velox/serializers/benchmarks/PrestoIterativePartitioningSerializerBenchmark.cpp b/velox/serializers/benchmarks/PrestoIterativePartitioningSerializerBenchmark.cpp new file mode 100644 index 00000000000..3244281a5dc --- /dev/null +++ b/velox/serializers/benchmarks/PrestoIterativePartitioningSerializerBenchmark.cpp @@ -0,0 +1,177 @@ +/* + * Copyright (c) International Business Machines Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "velox/serializers/PrestoIterativePartitioningSerializer.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::serializer::presto; + +constexpr int64_t kBufferSize = 2 * 1024 * 1024; + +namespace { + +class PrestoIterativePartitioningSerializerBenchmark + : public test::VectorTestBase { + public: + /// Creates a flat vector of type T with deterministic null pattern. + /// Rows where (row % 100) < nullPct are null. + template + VectorPtr makeColumnOfType(vector_size_t size, int32_t nullPct) { + if (nullPct == 0) { + return makeFlatVector( + size, [](auto row) { return static_cast(row); }); + } + return makeFlatVector( + size, + [](auto row) { return static_cast(row); }, + [nullPct](auto row) { return (row % 100) < nullPct; }); + } + + /// Creates a flat vector of the given TypeKind with the given null ratio. + VectorPtr makeColumn(vector_size_t size, TypeKind colKind, int32_t nullPct) { + switch (colKind) { + case TypeKind::BOOLEAN: + return makeColumnOfType(size, nullPct); + case TypeKind::INTEGER: + return makeColumnOfType(size, nullPct); + case TypeKind::BIGINT: + return makeColumnOfType(size, nullPct); + case TypeKind::HUGEINT: + return makeColumnOfType(size, nullPct); + default: + VELOX_UNSUPPORTED( + "Unsupported TypeKind: {}", TypeKindName::toName(colKind)); + } + } + + /// Creates a RowVector with numCols columns of the given TypeKind. + RowVectorPtr makeInput( + vector_size_t size, + TypeKind colKind, + uint32_t numCols, + int32_t nullPct) { + std::vector names; + std::vector children; + names.reserve(numCols); + children.reserve(numCols); + for (uint32_t i = 0; i < numCols; ++i) { + names.push_back(fmt::format("c{}", i)); + children.push_back(makeColumn(size, colKind, nullPct)); + } + return makeRowVector(names, children); + } + + std::vector makePartitions( + vector_size_t size, + uint32_t numPartitions) { + std::vector partitions(size); + for (vector_size_t i = 0; i < size; ++i) { + partitions[i] = i % numPartitions; + } + return partitions; + } + + std::unique_ptr makeSerializer( + const RowTypePtr& type, + uint32_t numPartitions) { + SerdeOpts opts; + return std::make_unique( + type, numPartitions, opts, pool_.get()); + } +}; + +} // namespace + +/// Single benchmark function parameterized by (colKind, numCols, nullPct, +/// numPartitions). Registered via BENCHMARK_NAMED_PARAM below. +/// +/// All runs use 10'000 rows. Setup (input creation, serializer construction, +/// append) is excluded from the measured time. +void benchmarkFlush( + uint32_t /* iters */, + TypeKind colKind, + uint32_t numCols, + int32_t nullPct, + uint32_t numPartitions) { + folly::BenchmarkSuspender suspender; + PrestoIterativePartitioningSerializerBenchmark benchmark; + auto input = benchmark.makeInput(10'000, colKind, numCols, nullPct); + auto parts = benchmark.makePartitions(10'000, numPartitions); + auto serializer = benchmark.makeSerializer( + std::static_pointer_cast(input->type()), numPartitions); + + while (serializer->bytesBuffered() < kBufferSize) { + serializer->append(input, parts); + } + + suspender.dismiss(); + + auto result = serializer->flush(); + folly::doNotOptimizeAway(result); +} + +// clang-format off +// Dimensions: +// col type: {bool, int, bigint, hugeint} +// num cols: {1, 4, 16, 64} +// null pct: {0, 25, 50, 75, 100} +// num partitions: {1, 4, 16, 64, 256, 1024} +// +// Naming: flush__cols_

pct_parts + +#define FLUSH_PARAM(type_name, kind, num_cols, null_pct, num_parts) \ + BENCHMARK_NAMED_PARAM( \ + benchmarkFlush, \ + type_name## _## num_cols## cols_## null_pct## pct_## num_parts## parts, \ + TypeKind::kind, num_cols, null_pct, num_parts) + +#define FLUSH_FOR_PARTS(type_name, kind, num_cols, null_pct) \ + FLUSH_PARAM(type_name, kind, num_cols, null_pct, 1) \ + FLUSH_PARAM(type_name, kind, num_cols, null_pct, 4) \ + FLUSH_PARAM(type_name, kind, num_cols, null_pct, 16) \ + FLUSH_PARAM(type_name, kind, num_cols, null_pct, 64) \ + FLUSH_PARAM(type_name, kind, num_cols, null_pct, 256) \ + FLUSH_PARAM(type_name, kind, num_cols, null_pct, 1024) + +#define FLUSH_FOR_NULLS(type_name, kind, num_cols) \ + FLUSH_FOR_PARTS(type_name, kind, num_cols, 0) \ + FLUSH_FOR_PARTS(type_name, kind, num_cols, 25) \ + FLUSH_FOR_PARTS(type_name, kind, num_cols, 50) \ + FLUSH_FOR_PARTS(type_name, kind, num_cols, 75) \ + FLUSH_FOR_PARTS(type_name, kind, num_cols, 100) + +#define FLUSH_FOR_COLS(type_name, kind) \ + FLUSH_FOR_NULLS(type_name, kind, 1) \ + FLUSH_FOR_NULLS(type_name, kind, 4) \ + FLUSH_FOR_NULLS(type_name, kind, 16) \ + FLUSH_FOR_NULLS(type_name, kind, 64) + +FLUSH_FOR_COLS(bool, BOOLEAN) +FLUSH_FOR_COLS(int, INTEGER) +FLUSH_FOR_COLS(bigint, BIGINT) +FLUSH_FOR_COLS(ldec, HUGEINT) +// clang-format on + +int main(int argc, char** argv) { + folly::Init init{&argc, &argv}; + memory::MemoryManager::initialize(memory::MemoryManager::Options{}); + PrestoVectorSerde::registerVectorSerde(); + folly::runBenchmarks(); + return 0; +} diff --git a/velox/serializers/tests/CMakeLists.txt b/velox/serializers/tests/CMakeLists.txt index d54e9ff9574..66eaa6d5bd4 100644 --- a/velox/serializers/tests/CMakeLists.txt +++ b/velox/serializers/tests/CMakeLists.txt @@ -35,6 +35,7 @@ target_link_libraries( add_executable( velox_serializer_test CompactRowSerializerTest.cpp + PrestoIterativePartitioningSerializerTest.cpp PrestoOutputStreamListenerTest.cpp PrestoSerializerTest.cpp SerializedPageFileTest.cpp @@ -52,6 +53,7 @@ target_link_libraries( velox_row_fast GTest::gtest GTest::gtest_main + GTest::gmock glog::glog ) diff --git a/velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp b/velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp new file mode 100644 index 00000000000..e3535a54406 --- /dev/null +++ b/velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp @@ -0,0 +1,707 @@ +/* + * Copyright (c) International Business Machines Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include "velox/serializers/PrestoIterativePartitioningSerializer.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::serializer::presto; +using namespace facebook::velox::test; + +class PrestoIterativePartitioningSerializerTest : public ::testing::Test, + public VectorTestBase { + protected: + static void SetUpTestSuite() { + memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); + if (!isRegisteredVectorSerde()) { + PrestoVectorSerde::registerVectorSerde(); + } + } + + /// Deserializes an IOBuf produced by PartitioningSerializer::flush(). + RowVectorPtr deserialize(folly::IOBuf& iobuf, const RowTypePtr& type) { + auto ranges = byteRangesFromIOBuf(&iobuf); + BufferInputStream stream(std::move(ranges)); + RowVectorPtr result; + serde_.deserialize(&stream, pool_.get(), type, &result, nullptr); + return result; + } + + /// Extracts flat values from a column into a sorted vector. + template + std::vector sortedValues(const RowVectorPtr& row, int column) { + auto* flat = row->childAt(column)->as>(); + std::vector vals(flat->rawValues(), flat->rawValues() + row->size()); + std::sort(vals.begin(), vals.end()); + return vals; + } + + /// Extracts values from a nullable column, preserving order and nulls. + template + std::vector> nullableValues( + const RowVectorPtr& row, + int column) { + auto* vec = row->childAt(column).get(); + std::vector> result; + result.reserve(row->size()); + for (int i = 0; i < row->size(); ++i) { + if (vec->isNullAt(i)) { + result.push_back(std::nullopt); + } else { + result.push_back(vec->as>()->valueAt(i)); + } + } + return result; + } + + /// Builds a PrestoIterativePartitioningSerializer with default serde options. + std::unique_ptr makeSerializer( + const RowTypePtr& type, + uint32_t numPartitions) { + SerdeOpts opts; + return std::make_unique( + type, numPartitions, opts, pool_.get()); + } + + PrestoVectorSerde serde_; +}; + +// ── Routing ────────────────────────────────────────────────────────────────── + +// Single append, two equal-sized partitions. +TEST_F(PrestoIterativePartitioningSerializerTest, basicTwoPartitions) { + auto type = ROW({"a"}, {BIGINT()}); + auto input = + makeRowVector({"a"}, {makeFlatVector({10, 20, 30, 40, 50, 60})}); + + // Even rows → partition 0, odd rows → partition 1. + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 1, 0, 1, 0, 1}); + + EXPECT_EQ(serializer->rowsBuffered(), 6); + + auto ioBufs = serializer->flush(); + ASSERT_EQ(ioBufs.size(), 2); + + EXPECT_EQ(serializer->rowsBuffered(), 0); + EXPECT_EQ(serializer->bytesBuffered(), 0); + + auto p0 = deserialize(*ioBufs.at(0).first, type); + auto p1 = deserialize(*ioBufs.at(1).first, type); + + ASSERT_EQ(p0->size(), 3); + ASSERT_EQ(p1->size(), 3); + + EXPECT_EQ(sortedValues(p0, 0), (std::vector{10, 30, 50})); + EXPECT_EQ(sortedValues(p1, 0), (std::vector{20, 40, 60})); +} + +// All rows routed to one non-zero partition; other partitions are absent. +TEST_F(PrestoIterativePartitioningSerializerTest, allRowsToOnePartition) { + auto type = ROW({"x"}, {INTEGER()}); + auto input = makeRowVector({"x"}, {makeFlatVector({1, 2, 3, 4, 5})}); + + auto serializer = makeSerializer(type, 4); + serializer->append(input, {2, 2, 2, 2, 2}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 1); + ASSERT_TRUE(ioBufs.count(2)); + + auto result = deserialize(*ioBufs.at(2).first, type); + ASSERT_EQ(result->size(), 5); + EXPECT_EQ( + sortedValues(result, 0), (std::vector{1, 2, 3, 4, 5})); +} + +// Single partition (numPartitions=1): all rows go to partition 0. +TEST_F(PrestoIterativePartitioningSerializerTest, singlePartition) { + auto type = ROW({"a"}, {BIGINT()}); + auto input = makeRowVector({"a"}, {makeFlatVector({1, 2, 3, 4, 5})}); + + auto serializer = makeSerializer(type, 1); + serializer->append(input, std::vector(5, 0)); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 1); + auto result = deserialize(*ioBufs.at(0).first, type); + ASSERT_EQ(result->size(), 5); + EXPECT_EQ( + sortedValues(result, 0), (std::vector{1, 2, 3, 4, 5})); +} + +// Multiple append() calls accumulate correctly before flush. +TEST_F(PrestoIterativePartitioningSerializerTest, multipleAppends) { + auto type = ROW({"v"}, {BIGINT()}); + auto serializer = makeSerializer(type, 3); + + serializer->append( + makeRowVector({"v"}, {makeFlatVector({100, 200, 300})}), + {0, 1, 2}); + serializer->append( + makeRowVector({"v"}, {makeFlatVector({400, 500, 600})}), + {2, 0, 1}); + + EXPECT_EQ(serializer->rowsBuffered(), 6); + + auto ioBufs = serializer->flush(); + ASSERT_EQ(ioBufs.size(), 3); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + auto r1 = deserialize(*ioBufs.at(1).first, type); + auto r2 = deserialize(*ioBufs.at(2).first, type); + + ASSERT_EQ(r0->size(), 2); + ASSERT_EQ(r1->size(), 2); + ASSERT_EQ(r2->size(), 2); + + EXPECT_EQ(sortedValues(r0, 0), (std::vector{100, 500})); + EXPECT_EQ(sortedValues(r1, 0), (std::vector{200, 600})); + EXPECT_EQ(sortedValues(r2, 0), (std::vector{300, 400})); +} + +// Multiple columns: each is serialized independently by flushRowChildren. +TEST_F(PrestoIterativePartitioningSerializerTest, multipleColumns) { + auto type = ROW({"a", "b"}, {INTEGER(), BIGINT()}); + auto input = makeRowVector( + {"a", "b"}, + {makeFlatVector({1, 2, 3, 4}), + makeFlatVector({10, 20, 30, 40})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 1, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + ASSERT_EQ(r0->size(), 2); + EXPECT_THAT(sortedValues(r0, 0), testing::ElementsAre(1, 2)); + EXPECT_THAT(sortedValues(r0, 1), testing::ElementsAre(10, 20)); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + ASSERT_EQ(r1->size(), 2); + EXPECT_THAT(sortedValues(r1, 0), testing::ElementsAre(3, 4)); + EXPECT_THAT(sortedValues(r1, 1), testing::ElementsAre(30, 40)); +} + +// ── Empty state +// ─────────────────────────────────────────────────────────────── + +// Flushing an empty serializer returns an empty map. +TEST_F(PrestoIterativePartitioningSerializerTest, flushEmpty) { + auto type = ROW({"a"}, {BIGINT()}); + auto serializer = makeSerializer(type, 3); + EXPECT_TRUE(serializer->flush().empty()); +} + +// Appending an empty RowVector produces no ioBufs on flush. +TEST_F(PrestoIterativePartitioningSerializerTest, appendEmptyVector) { + auto type = ROW({"a"}, {BIGINT()}); + auto serializer = makeSerializer(type, 2); + serializer->append(makeRowVector({"a"}, {makeFlatVector({})}), {}); + EXPECT_TRUE(serializer->flush().empty()); +} + +// ── Null handling +// ───────────────────────────────────────────────────────────── + +// Nulls appear only in one partition; the other partition is null-free. +TEST_F(PrestoIterativePartitioningSerializerTest, nullsInOnePartition) { + auto type = ROW({"a"}, {BIGINT()}); + using opt = std::optional; + // Rows 0,1,2 → partition 0; rows 3,4 → partition 1. + // Partition 0 has a null at position 1; partition 1 has no nulls. + auto input = makeRowVector( + {"a"}, {makeNullableFlatVector({10, std::nullopt, 30, 40, 50})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 0, 1, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + ASSERT_EQ(r0->size(), 3); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre(opt{10}, std::nullopt, opt{30})); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + ASSERT_EQ(r1->size(), 2); + EXPECT_THAT( + nullableValues(r1, 0), testing::ElementsAre(opt{40}, opt{50})); +} + +// Both partitions contain nulls. +TEST_F(PrestoIterativePartitioningSerializerTest, nullsInBothPartitions) { + auto type = ROW({"a"}, {BIGINT()}); + using opt = std::optional; + // Rows 0,1 → partition 0; rows 2,3 → partition 1. + // Partition 0: [10, null]; partition 1: [null, 40]. + auto input = makeRowVector( + {"a"}, + {makeNullableFlatVector({10, std::nullopt, std::nullopt, 40})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 1, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre(opt{10}, std::nullopt)); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + EXPECT_THAT( + nullableValues(r1, 0), + testing::ElementsAre(std::nullopt, opt{40})); +} + +// All rows in one partition are null. +TEST_F(PrestoIterativePartitioningSerializerTest, allNullsInPartition) { + auto type = ROW({"a"}, {BIGINT()}); + using opt = std::optional; + // Partition 0: two nulls. Partition 1: one non-null. + auto input = makeRowVector( + {"a"}, + {makeNullableFlatVector({std::nullopt, std::nullopt, 30})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre(std::nullopt, std::nullopt)); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + EXPECT_THAT(nullableValues(r1, 0), testing::ElementsAre(opt{30})); +} + +// Nulls contributed by different appends to the same partition, exercising +// carry-over between vectors. +TEST_F(PrestoIterativePartitioningSerializerTest, nullsAcrossMultipleAppends) { + auto type = ROW({"a"}, {BIGINT()}); + using opt = std::optional; + auto serializer = makeSerializer(type, 2); + + // Append 1: rows 0,1 → partition 0; row 2 → partition 1. + // Partition 0 gets [10, null]; partition 1 gets [30]. + serializer->append( + makeRowVector( + {"a"}, {makeNullableFlatVector({10, std::nullopt, 30})}), + {0, 0, 1}); + + // Append 2: row 0 → partition 0; row 1 → partition 1. + // Partition 0 gets [null]; partition 1 gets [50]. + serializer->append( + makeRowVector( + {"a"}, {makeNullableFlatVector({std::nullopt, 50})}), + {0, 1}); + + auto ioBufs = serializer->flush(); + ASSERT_EQ(ioBufs.size(), 2); + + // Partition 0 should have [10, null] from append 1 + [null] from append 2. + auto r0 = deserialize(*ioBufs.at(0).first, type); + ASSERT_EQ(r0->size(), 3); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre(opt{10}, std::nullopt, std::nullopt)); + + // Partition 1 should have [30] from append 1 + [50] from append 2. + auto r1 = deserialize(*ioBufs.at(1).first, type); + ASSERT_EQ(r1->size(), 2); + EXPECT_THAT( + nullableValues(r1, 0), testing::ElementsAre(opt{30}, opt{50})); +} + +// Partition boundary falls in the middle of a null-bitmap byte, exercising the +// bit-extraction carry-over logic. +TEST_F(PrestoIterativePartitioningSerializerTest, nullsUnalignedBoundary) { + auto type = ROW({"a"}, {BIGINT()}); + using opt = std::optional; + // 5 rows → partition 0, 4 rows → partition 1. The boundary at bit 5 is + // inside the first byte of the null bitmap. + auto input = makeRowVector( + {"a"}, + {makeNullableFlatVector( + {10, + std::nullopt, + 30, + std::nullopt, + 50, + std::nullopt, + 70, + std::nullopt, + 90})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 0, 0, 0, 1, 1, 1, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + ASSERT_EQ(r0->size(), 5); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre( + opt{10}, std::nullopt, opt{30}, std::nullopt, opt{50})); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + ASSERT_EQ(r1->size(), 4); + EXPECT_THAT( + nullableValues(r1, 0), + testing::ElementsAre(std::nullopt, opt{70}, std::nullopt, opt{90})); +} + +// ── BOOLEAN type +// ────────────────────────────────────────────────────────────── BOOLEAN +// columns use bit-packed FlatVector storage; the serializer writes each +// non-null value as a kByteArray byte (0x00 or 0x01). The tests below verify +// correct encoding, null handling, and partition routing. + +// Two partitions, no nulls: {T,F,T,F,T,F} alternating, even→p0, odd→p1. +TEST_F(PrestoIterativePartitioningSerializerTest, booleanNoNulls) { + auto type = ROW({"b"}, {BOOLEAN()}); + auto input = makeRowVector( + {"b"}, {makeFlatVector({true, false, true, false, true, false})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 1, 0, 1, 0, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + ASSERT_EQ(r0->size(), 3); + EXPECT_THAT( + nullableValues(r0, 0), testing::ElementsAre(true, true, true)); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + ASSERT_EQ(r1->size(), 3); + EXPECT_THAT( + nullableValues(r1, 0), testing::ElementsAre(false, false, false)); +} + +// Nulls mixed into both partitions. +TEST_F(PrestoIterativePartitioningSerializerTest, booleanWithNulls) { + auto type = ROW({"b"}, {BOOLEAN()}); + using opt = std::optional; + // p0: [T, null, F]; p1: [null, T]. + auto input = makeRowVector( + {"b"}, + {makeNullableFlatVector( + {true, std::nullopt, false, std::nullopt, true})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 0, 1, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre(opt{true}, std::nullopt, opt{false})); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + EXPECT_THAT( + nullableValues(r1, 0), + testing::ElementsAre(std::nullopt, opt{true})); +} + +// All values null in one partition; non-null values in the other. +TEST_F(PrestoIterativePartitioningSerializerTest, booleanAllNullsInPartition) { + auto type = ROW({"b"}, {BOOLEAN()}); + using opt = std::optional; + auto input = makeRowVector( + {"b"}, + {makeNullableFlatVector( + {std::nullopt, std::nullopt, true, false})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 1, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre(std::nullopt, std::nullopt)); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + EXPECT_THAT( + nullableValues(r1, 0), testing::ElementsAre(opt{true}, opt{false})); +} + +// Multiple BOOLEAN columns: each is serialized and deserialized independently. +TEST_F(PrestoIterativePartitioningSerializerTest, booleanMultipleColumns) { + auto type = ROW({"x", "y"}, {BOOLEAN(), BOOLEAN()}); + using opt = std::optional; + // 4 rows, 2 partitions. + auto input = makeRowVector( + {"x", "y"}, + {makeNullableFlatVector({true, std::nullopt, false, true}), + makeNullableFlatVector( + {std::nullopt, false, true, std::nullopt})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 1, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + ASSERT_EQ(r0->size(), 2); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre(opt{true}, std::nullopt)); + EXPECT_THAT( + nullableValues(r0, 1), + testing::ElementsAre(std::nullopt, opt{false})); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + ASSERT_EQ(r1->size(), 2); + EXPECT_THAT( + nullableValues(r1, 0), testing::ElementsAre(opt{false}, opt{true})); + EXPECT_THAT( + nullableValues(r1, 1), + testing::ElementsAre(opt{true}, std::nullopt)); +} + +// ── Lifecycle +// ───────────────────────────────────────────────────────────────── + +// Flush twice: second flush on empty state returns an empty map. +TEST_F(PrestoIterativePartitioningSerializerTest, flushTwice) { + auto type = ROW({"a"}, {BIGINT()}); + auto serializer = makeSerializer(type, 2); + serializer->append( + makeRowVector({"a"}, {makeFlatVector({10, 20})}), {0, 1}); + + auto ioBufs1 = serializer->flush(); + ASSERT_EQ(ioBufs1.size(), 2); + + EXPECT_TRUE(serializer->flush().empty()); +} + +// Append and flush multiple independent cycles. +TEST_F(PrestoIterativePartitioningSerializerTest, multipleCycles) { + auto type = ROW({"a"}, {INTEGER()}); + auto serializer = makeSerializer(type, 2); + + for (int cycle = 0; cycle < 3; ++cycle) { + serializer->append( + makeRowVector( + {"a"}, {makeFlatVector({cycle * 2, cycle * 2 + 1})}), + {0, 1}); + auto ioBufs = serializer->flush(); + ASSERT_EQ(ioBufs.size(), 2) << "cycle " << cycle; + + auto r0 = deserialize(*ioBufs.at(0).first, type); + auto r1 = deserialize(*ioBufs.at(1).first, type); + ASSERT_EQ(r0->size(), 1) << "cycle " << cycle; + ASSERT_EQ(r1->size(), 1) << "cycle " << cycle; + EXPECT_EQ(r0->childAt(0)->as>()->valueAt(0), cycle * 2); + EXPECT_EQ( + r1->childAt(0)->as>()->valueAt(0), cycle * 2 + 1); + } +} + +// ── Scale and regression +// ────────────────────────────────────────────────────── + +// 1024 partitions with random int64 values: verify every value reaches +// exactly the right partition and nothing is lost or duplicated. +TEST_F(PrestoIterativePartitioningSerializerTest, manyPartitionsRandom) { + constexpr uint32_t kNumPartitions = 1024; + constexpr int32_t kNumRows = 64'000; + + std::mt19937_64 rng(42); + std::uniform_int_distribution valueDist; + std::uniform_int_distribution partDist(0, kNumPartitions - 1); + + std::vector inputValues(kNumRows); + std::vector partitions(kNumRows); + // expected[p] holds the sorted values assigned to partition p. + std::vector> expected(kNumPartitions); + + for (int i = 0; i < kNumRows; ++i) { + inputValues[i] = valueDist(rng); + partitions[i] = partDist(rng); + expected[partitions[i]].push_back(inputValues[i]); + } + for (auto& v : expected) { + std::sort(v.begin(), v.end()); + } + + auto type = ROW({"v"}, {BIGINT()}); + auto input = makeRowVector({"v"}, {makeFlatVector(inputValues)}); + + auto serializer = makeSerializer(type, kNumPartitions); + serializer->append(input, partitions); + auto ioBufs = serializer->flush(); + + // Every non-empty partition must have a page; empty partitions must not. + for (uint32_t p = 0; p < kNumPartitions; ++p) { + if (expected[p].empty()) { + EXPECT_EQ(ioBufs.count(p), 0) << "partition " << p; + } else { + ASSERT_EQ(ioBufs.count(p), 1) << "partition " << p; + auto result = deserialize(*ioBufs.at(p).first, type); + ASSERT_EQ(result->size(), static_cast(expected[p].size())) + << "partition " << p; + EXPECT_EQ(sortedValues(result, 0), expected[p]) + << "partition " << p; + } + } +} + +// 1024 partitions with random int64 values and ~25% nulls: verify every +// value and null reaches exactly the right partition in input order, and +// nothing is lost or duplicated. +TEST_F( + PrestoIterativePartitioningSerializerTest, + manyPartitionsRandomWithNulls) { + constexpr uint32_t kNumPartitions = 1024; + constexpr int32_t kNumRows = 64'000; + constexpr int32_t kNullPct = 25; + + std::mt19937_64 rng(43); + std::uniform_int_distribution valueDist; + std::uniform_int_distribution partDist(0, kNumPartitions - 1); + std::uniform_int_distribution nullDist(0, 99); + + std::vector> inputValues(kNumRows); + std::vector partitions(kNumRows); + // expected[p] holds the sequence of (value-or-null) assigned to partition p + // in input order. + std::vector>> expected(kNumPartitions); + + for (int i = 0; i < kNumRows; ++i) { + partitions[i] = partDist(rng); + if (nullDist(rng) < kNullPct) { + inputValues[i] = std::nullopt; + } else { + inputValues[i] = valueDist(rng); + } + expected[partitions[i]].push_back(inputValues[i]); + } + + auto type = ROW({"v"}, {BIGINT()}); + auto input = + makeRowVector({"v"}, {makeNullableFlatVector(inputValues)}); + + auto serializer = makeSerializer(type, kNumPartitions); + serializer->append(input, partitions); + auto ioBufs = serializer->flush(); + + // Partition rearranges values within each partition, so compare sorted. + // std::optional sorts with nullopt < any value, preserving null count. + for (uint32_t p = 0; p < kNumPartitions; ++p) { + if (expected[p].empty()) { + EXPECT_EQ(ioBufs.count(p), 0) << "partition " << p; + } else { + ASSERT_EQ(ioBufs.count(p), 1) << "partition " << p; + auto result = deserialize(*ioBufs.at(p).first, type); + ASSERT_EQ(result->size(), static_cast(expected[p].size())) + << "partition " << p; + + auto expectedSorted = expected[p]; + std::sort(expectedSorted.begin(), expectedSorted.end()); + + auto actual = nullableValues(result, 0); + std::sort(actual.begin(), actual.end()); + + EXPECT_EQ(actual, expectedSorted) << "partition " << p; + } + } +} + +// Regression: flushNulls previously wrote null bitmaps by obtaining a raw +// pointer via writePosition() then advancing the stream via seekp(). This +// assumed the pre-allocated IOBufOutputStream had a single contiguous range, +// but StreamArena::newRange caps each range at the size of one allocator run, +// which can be smaller than the requested size. seekp() then failed because +// the target position exceeded the end of the first (and only) range. +// +// Reproducing condition: 16 columns × 10'000 rows × 50% nulls in one +// partition generates enough output (~100 KB) to trigger the run-size cap. +TEST_F( + PrestoIterativePartitioningSerializerTest, + flushNullsBitmapManyColumnsLargeRowCount) { + constexpr int32_t kNumCols = 16; + constexpr int32_t kNumRows = 10'000; + + std::vector names; + std::vector children; + names.reserve(kNumCols); + children.reserve(kNumCols); + + for (int col = 0; col < kNumCols; ++col) { + names.push_back(fmt::format("c{}", col)); + // Rows where (row % 2 == 0) are null; the rest hold (row * kNumCols + col). + children.push_back( + makeFlatVector( + kNumRows, + [col](auto row) { + return static_cast(row * kNumCols + col); + }, + [](auto row) { return (row % 2) == 0; })); + } + + auto input = makeRowVector(names, children); + auto rowType = std::static_pointer_cast(input->type()); + + auto serializer = makeSerializer(rowType, 1); + serializer->append(input, std::vector(kNumRows, 0)); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 1); + + auto result = deserialize(*ioBufs.at(0).first, rowType); + ASSERT_EQ(result->size(), kNumRows); + + for (int col = 0; col < kNumCols; ++col) { + auto* flat = result->childAt(col)->as>(); + for (int row = 0; row < kNumRows; ++row) { + if ((row % 2) == 0) { + EXPECT_TRUE(result->childAt(col)->isNullAt(row)) + << "col=" << col << " row=" << row; + } else { + ASSERT_FALSE(result->childAt(col)->isNullAt(row)) + << "col=" << col << " row=" << row; + EXPECT_EQ( + flat->valueAt(row), static_cast(row * kNumCols + col)) + << "col=" << col << " row=" << row; + } + } + } +} From 854b6e8f2793061f45b410bedd909ee487d1ff68 Mon Sep 17 00:00:00 2001 From: Xin Zhang Date: Thu, 26 Mar 2026 14:51:58 +0000 Subject: [PATCH 10/10] feat(PartitionedOutput): Add flushCompressed implementation --- .../PrestoIterativePartitioningSerializer.cpp | 138 +++++++++++++++--- .../PrestoIterativePartitioningSerializer.h | 5 +- ...stoIterativePartitioningSerializerTest.cpp | 57 +++++++- 3 files changed, 175 insertions(+), 25 deletions(-) diff --git a/velox/serializers/PrestoIterativePartitioningSerializer.cpp b/velox/serializers/PrestoIterativePartitioningSerializer.cpp index d14c1ea96ab..eaf4d34ede2 100644 --- a/velox/serializers/PrestoIterativePartitioningSerializer.cpp +++ b/velox/serializers/PrestoIterativePartitioningSerializer.cpp @@ -24,6 +24,7 @@ namespace facebook::velox::serializer::presto { namespace { +constexpr int8_t kCompressedBitMask = 1; constexpr int8_t kCheckSumBitMask = 4; constexpr int64_t kVectorSizeTypeSize{sizeof(vector_size_t)}; // [numRows:4][codec:1] @@ -350,7 +351,9 @@ PrestoIterativePartitioningSerializer::flushUncompressed() { rawOutputStreams[p] = outputStreams[p].get(); beginStreamPositions[p] = outputStreams[p]->tellp(); - flushStart(*outputStreams[p], p, codecMask); + flushStart(*outputStreams[p], rowsPerPartition_[p], codecMask); + + writeInt32(rawOutputStreams[p], static_cast(numColumns_)); } // 4. Flush column data. @@ -362,10 +365,13 @@ PrestoIterativePartitioningSerializer::flushUncompressed() { std::map, vector_size_t>> result; for (uint32_t p : nonEmptyPartitions) { + const auto uncompressedSize = static_cast( + outputStreams[p]->tellp() - beginStreamPositions[p] - kHeaderSize); flushFinish( *outputStreams[p], - p, + rowsPerPartition_[p], beginStreamPositions[p], + uncompressedSize, codecMask, *listeners[p]); result[p] = @@ -377,7 +383,108 @@ PrestoIterativePartitioningSerializer::flushUncompressed() { std::map, vector_size_t>> PrestoIterativePartitioningSerializer::flushCompressed() { - VELOX_NYI(); + if (partitionedRowVectors_.empty()) { + return {}; + } + + std::vector nonEmptyPartitions; + for (uint32_t p = 0; p < numPartitions_; ++p) { + if (rowsPerPartition_[p] > 0) { + nonEmptyPartitions.push_back(p); + } + } + + const auto& rowSchema = type_->asRow(); + flushSizes_.assign(rowSchema.size(), std::vector(numPartitions_, 0)); + for (uint32_t col = 0; col < rowSchema.size(); ++col) { + std::vector columnVectors; + columnVectors.reserve(partitionedRowVectors_.size()); + for (const auto& pRowVector : partitionedRowVectors_) { + columnVectors.push_back( + std::dynamic_pointer_cast(pRowVector) + ->childAt(col)); + } + flushSizes_[col] = computeColumnFlushSizes( + columnVectors, + rowSchema.childAt(col), + nonEmptyPartitions, + rowsPerPartition_, + numPartitions_); + } + + // Temporary uncompressed payload streams per partition, used as compression + // input before finalize the page. + std::vector> outStreams(numPartitions_); + std::vector rawOutStreams(numPartitions_); + + for (uint32_t p : nonEmptyPartitions) { + int64_t initialSize = 4; // numCols + for (uint32_t col = 0; col < rowSchema.size(); ++col) { + initialSize += flushSizes_[col][p]; + } + outStreams[p] = + std::make_unique(*pool_, nullptr, initialSize); + rawOutStreams[p] = outStreams[p].get(); + writeInt32(rawOutStreams[p], static_cast(numColumns_)); + } + + flushRowChildren( + partitionedRowVectors_, rowSchema, nonEmptyPartitions, rawOutStreams); + + // Create final output streams sized to the exact bytes each partition needs. + std::vector> listeners( + numPartitions_); + std::vector> outputStreams(numPartitions_); + std::vector rawOutputStreams(numPartitions_); + std::vector beginStreamPositions(numPartitions_); + + auto codec = common::compressionKindToCodec(opts_.compressionKind); + std::map, vector_size_t>> + result; + + for (uint32_t p : nonEmptyPartitions) { + const auto uncompressedSize = static_cast(outStreams[p]->tellp()); + VELOX_CHECK_LE( + uncompressedSize, + codec->maxUncompressedLength(), + "UncompressedSize exceeds limit"); + + auto iobuf = outStreams[p]->getIOBuf(); + auto compressedBuffer = codec->compress(iobuf.get()); + const auto compressedSize = + static_cast(compressedBuffer->computeChainDataLength()); + + const bool compressed = + compressedSize <= uncompressedSize * opts_.minCompressionRatio; + const auto serializedSize = compressed ? compressedSize : uncompressedSize; + const char codecMask = + compressed ? (kCompressedBitMask | kCheckSumBitMask) : getCodecMarker(); + const auto& serializedBuffer = compressed ? compressedBuffer : iobuf; + + listeners[p] = std::make_unique(); + outputStreams[p] = std::make_unique( + *pool_, listeners[p].get(), kHeaderSize + serializedSize); + const auto beginOffset = outputStreams[p]->tellp(); + + flushStart(*outputStreams[p], rowsPerPartition_[p], codecMask); + + for (auto range : *serializedBuffer) { + outputStreams[p]->write( + reinterpret_cast(range.data()), range.size()); + } + + flushFinish( + *outputStreams[p], + rowsPerPartition_[p], + beginOffset, + uncompressedSize, + codecMask, + *listeners[p]); + result[p] = + std::make_pair(outputStreams[p]->getIOBuf(), rowsPerPartition_[p]); + } + + return result; } // --------------------------------------------------------------------------- @@ -386,7 +493,7 @@ PrestoIterativePartitioningSerializer::flushCompressed() { void PrestoIterativePartitioningSerializer::flushStart( IOBufOutputStream& out, - uint32_t partition, + int32_t numRows, char codecMask) const { auto* listener = dynamic_cast(out.listener()); if (listener) { @@ -394,7 +501,6 @@ void PrestoIterativePartitioningSerializer::flushStart( } // Write 21-byte Presto page header; sizes and CRC are filled in later. - const int32_t numRows = static_cast(rowsPerPartition_[partition]); char header[kHeaderSize] = {}; std::memcpy(&header[0], &numRows, 4); std::memcpy(&header[4], &codecMask, 1); @@ -403,10 +509,6 @@ void PrestoIterativePartitioningSerializer::flushStart( if (listener) { listener->resume(); } - - // Number of columns is included in the CRC. - const int32_t numCols = static_cast(numColumns_); - out.write(reinterpret_cast(&numCols), 4); } void PrestoIterativePartitioningSerializer::flushRowChildren( @@ -431,26 +533,24 @@ void PrestoIterativePartitioningSerializer::flushRowChildren( void PrestoIterativePartitioningSerializer::flushFinish( IOBufOutputStream& out, - uint32_t partition, + int32_t numRows, std::streampos beginOffset, + int32_t uncompressedSize, char codecMask, PrestoOutputStreamListener& listener) const { listener.pause(); - const std::streampos totalSize = - static_cast(out.tellp() - beginOffset); - const std::streampos uncompressedSize = totalSize - kHeaderSize; + const auto endOffset = out.tellp(); + const auto serializedSize = + static_cast(out.tellp() - beginOffset - kHeaderSize); const int64_t crc = computeChecksum( - listener, - static_cast(codecMask), - static_cast(rowsPerPartition_[partition]), - uncompressedSize); + listener, static_cast(codecMask), numRows, uncompressedSize); out.seekp(beginOffset + kUncompressedSizeOffset); writeInt32(&out, uncompressedSize); - writeInt32(&out, uncompressedSize); // TODO: compressedSize + writeInt32(&out, serializedSize); writeInt64(&out, crc); - out.seekp(beginOffset + totalSize); + out.seekp(endOffset); } // --------------------------------------------------------------------------- diff --git a/velox/serializers/PrestoIterativePartitioningSerializer.h b/velox/serializers/PrestoIterativePartitioningSerializer.h index b9e41286ea6..9fc0f1d57b7 100644 --- a/velox/serializers/PrestoIterativePartitioningSerializer.h +++ b/velox/serializers/PrestoIterativePartitioningSerializer.h @@ -79,13 +79,14 @@ class PrestoIterativePartitioningSerializer { std::map, vector_size_t>> flushCompressed(); - void flushStart(IOBufOutputStream& out, uint32_t partition, char codecMask) + void flushStart(IOBufOutputStream& out, int32_t numRows, char codecMask) const; void flushFinish( IOBufOutputStream& out, - uint32_t partition, + int32_t numRows, std::streampos beginOffset, + int32_t uncompressedSize, char codecMask, PrestoOutputStreamListener& listener) const; diff --git a/velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp b/velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp index e3535a54406..a35c0911615 100644 --- a/velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp +++ b/velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp @@ -19,7 +19,9 @@ #include #include +#include "velox/serializers/PrestoHeader.h" #include "velox/serializers/PrestoIterativePartitioningSerializer.h" +#include "velox/serializers/PrestoSerializerDeserializationUtils.h" #include "velox/vector/tests/utils/VectorTestBase.h" using namespace facebook::velox; @@ -37,11 +39,14 @@ class PrestoIterativePartitioningSerializerTest : public ::testing::Test, } /// Deserializes an IOBuf produced by PartitioningSerializer::flush(). - RowVectorPtr deserialize(folly::IOBuf& iobuf, const RowTypePtr& type) { + RowVectorPtr deserialize( + folly::IOBuf& iobuf, + const RowTypePtr& type, + const SerdeOpts* opts = {}) { auto ranges = byteRangesFromIOBuf(&iobuf); BufferInputStream stream(std::move(ranges)); RowVectorPtr result; - serde_.deserialize(&stream, pool_.get(), type, &result, nullptr); + serde_.deserialize(&stream, pool_.get(), type, &result, opts); return result; } @@ -75,8 +80,8 @@ class PrestoIterativePartitioningSerializerTest : public ::testing::Test, /// Builds a PrestoIterativePartitioningSerializer with default serde options. std::unique_ptr makeSerializer( const RowTypePtr& type, - uint32_t numPartitions) { - SerdeOpts opts; + uint32_t numPartitions, + const SerdeOpts& opts = {}) { return std::make_unique( type, numPartitions, opts, pool_.get()); } @@ -496,6 +501,50 @@ TEST_F(PrestoIterativePartitioningSerializerTest, booleanMultipleColumns) { testing::ElementsAre(opt{true}, std::nullopt)); } +// Verify compressed page header bits and round-trip deserialization. +TEST_F(PrestoIterativePartitioningSerializerTest, compressedRoundTrip) { + constexpr int32_t kNumRows = 5'000; + + SerdeOpts opts; + opts.compressionKind = common::CompressionKind::CompressionKind_ZLIB; + opts.minCompressionRatio = 0.99; + + std::vector values(kNumRows, 7); + std::vector partitions(kNumRows); + for (int32_t i = 0; i < kNumRows; ++i) { + partitions[i] = i % 2; + } + + auto type = ROW({"a", "b"}, {BIGINT(), BIGINT()}); + auto input = makeRowVector( + {"a", "b"}, + {makeFlatVector(values), makeFlatVector(values)}); + + auto serializer = makeSerializer(type, 2, opts); + serializer->append(input, partitions); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + const std::vector expected(kNumRows / 2, 7); + for (uint32_t partition = 0; partition < 2; ++partition) { + auto ranges = byteRangesFromIOBuf(ioBufs.at(partition).first.get()); + BufferInputStream stream(std::move(ranges)); + auto maybeHeader = serializer::presto::detail::PrestoHeader::read(&stream); + ASSERT_TRUE(maybeHeader.hasValue()); + + const auto header = maybeHeader.value(); + EXPECT_TRUE( + serializer::presto::detail::isCompressedBitSet(header.pageCodecMarker)); + EXPECT_LT(header.compressedSize, header.uncompressedSize); + + auto result = deserialize(*ioBufs.at(partition).first, type, &opts); + ASSERT_EQ(result->size(), kNumRows / 2); + EXPECT_EQ(sortedValues(result, 0), expected); + EXPECT_EQ(sortedValues(result, 1), expected); + } +} + // ── Lifecycle // ─────────────────────────────────────────────────────────────────