diff --git a/velox/serializers/CMakeLists.txt b/velox/serializers/CMakeLists.txt index 9c6390cc730..966e8422881 100644 --- a/velox/serializers/CMakeLists.txt +++ b/velox/serializers/CMakeLists.txt @@ -29,6 +29,7 @@ velox_add_library( UnsafeRowSerializer.cpp PrestoBatchVectorSerializer.cpp PrestoHeader.cpp + PrestoIterativePartitioningSerializer.cpp PrestoIterativeVectorSerializer.cpp PrestoSerializerDeserializationUtils.cpp PrestoSerializerEstimationUtils.cpp diff --git a/velox/serializers/PrestoIterativePartitioningSerializer.cpp b/velox/serializers/PrestoIterativePartitioningSerializer.cpp new file mode 100644 index 00000000000..eaf4d34ede2 --- /dev/null +++ b/velox/serializers/PrestoIterativePartitioningSerializer.cpp @@ -0,0 +1,831 @@ +/* + * Copyright (c) International Business Machines Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/serializers/PrestoIterativePartitioningSerializer.h" + +#include "velox/common/base/BitUtil.h" +#include "velox/type/Type.h" +#include "velox/vector/ComplexVector.h" +#include "velox/vector/FlatVector.h" + +namespace facebook::velox::serializer::presto { + +namespace { + +constexpr int8_t kCompressedBitMask = 1; +constexpr int8_t kCheckSumBitMask = 4; +constexpr int64_t kVectorSizeTypeSize{sizeof(vector_size_t)}; +// [numRows:4][codec:1] +constexpr int64_t kUncompressedSizeOffset{kVectorSizeTypeSize + 1}; +// [numRows:4][codec:1][uncompressedSize:4][compressedSize:4][checksum:8] +constexpr int64_t kHeaderSize{kUncompressedSizeOffset + 4 + 4 + 8}; + +static inline const std::string_view kByteArray{"BYTE_ARRAY"}; +static inline const std::string_view kShortArray{"SHORT_ARRAY"}; +static inline const std::string_view kIntArray{"INT_ARRAY"}; +static inline const std::string_view kLongArray{"LONG_ARRAY"}; +static inline const std::string_view kInt128Array{"INT128_ARRAY"}; +static inline const std::string_view kVariableWidth{"VARIABLE_WIDTH"}; +static inline const std::string_view kRow{"ROW"}; + +inline void writeInt32(OutputStream* out, int32_t value) { + out->write(reinterpret_cast(&value), sizeof(value)); +} + +inline void writeInt64(OutputStream* out, int64_t value) { + out->write(reinterpret_cast(&value), sizeof(value)); +} + +char getCodecMarker() { + char marker = 0; + marker |= kCheckSumBitMask; + return marker; +} + +std::string_view typeToEncodingName(const TypePtr& type) { + switch (type->kind()) { + case TypeKind::BOOLEAN: + case TypeKind::TINYINT: + return kByteArray; + case TypeKind::SMALLINT: + return kShortArray; + case TypeKind::INTEGER: + case TypeKind::REAL: + return kIntArray; + case TypeKind::BIGINT: + case TypeKind::DOUBLE: + case TypeKind::TIMESTAMP: + return kLongArray; + case TypeKind::HUGEINT: + return kInt128Array; + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: + return kVariableWidth; + case TypeKind::ROW: + return kRow; + default: + VELOX_FAIL("Unsupported type kind: {}", static_cast(type->kind())); + } +} + +/// Finalizes the Presto page CRC by mixing in the codec marker, row count, +/// and uncompressed size on top of the listener's accumulated data checksum. +int64_t computeChecksum( + PrestoOutputStreamListener& listener, + int8_t codecMarker, + int32_t numRows, + int32_t uncompressedSize) { + auto crc = listener.crc(); + crc.process_bytes(&codecMarker, 1); + crc.process_bytes(&numRows, 4); + crc.process_bytes(&uncompressedSize, 4); + return static_cast(crc.checksum()); +} + +/// Returns the serialized byte width of a fixed-width type, matching the +/// sizeof(T) used in flushFlatValues. +int32_t fixedTypeWidth(TypeKind kind) { + switch (kind) { + case TypeKind::BOOLEAN: + case TypeKind::TINYINT: + return 1; + case TypeKind::SMALLINT: + return 2; + case TypeKind::INTEGER: + case TypeKind::REAL: + return 4; + case TypeKind::BIGINT: + case TypeKind::DOUBLE: + return 8; + case TypeKind::TIMESTAMP: + case TypeKind::HUGEINT: + return 16; + default: + return 0; + } +} + +/// Returns the exact bytes for one fixed-width column in one partition. +int64_t +simpleColumnBytes(const TypePtr& colType, int64_t numRows, int64_t numNulls) { + const auto encodingName = typeToEncodingName(colType); + return 4 + static_cast(encodingName.size()) + // header + 4 + // rowCount + 1 + // nullFlag + (numNulls > 0 ? bits::nbytes(numRows) : 0) + // null bitmap + (numRows - numNulls) * fixedTypeWidth(colType->kind()); // values +} + +/// Returns per-partition exact byte counts for one column (all partitions). +/// Recurses into nested ROW columns. +/// +/// Byte layout per column type: +/// Fixed-width: simpleColumnBytes(colType, numRows, numNulls) +/// ROW: 7 (header) + 4 (numFields) +/// + sum(child sizes) +/// + 4 (numRows) + 4*(numRows+1) (offsets) + 1 (hasNulls) +/// + (rowNulls>0 ? bits::nbytes(numRows) : 0) +std::vector computeColumnFlushSizes( + const std::vector& columnVectors, + const TypePtr& colType, + const std::vector& nonEmptyPartitions, + const std::vector& rowsPerPartition, + uint32_t numPartitions) { + std::vector sizes(numPartitions, 0); + + // Compute per-partition null counts by summing across batches. + std::vector nullCounts(numPartitions, 0); + for (uint32_t p : nonEmptyPartitions) { + for (const auto& pv : columnVectors) { + nullCounts[p] += pv->numNullsAt(p); + } + } + + switch (colType->kind()) { + case TypeKind::BOOLEAN: + case TypeKind::TINYINT: + case TypeKind::SMALLINT: + case TypeKind::INTEGER: + case TypeKind::BIGINT: + case TypeKind::REAL: + case TypeKind::DOUBLE: + case TypeKind::HUGEINT: + for (uint32_t p : nonEmptyPartitions) { + sizes[p] = + simpleColumnBytes(colType, rowsPerPartition[p], nullCounts[p]); + } + break; + + case TypeKind::TIMESTAMP: + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: + case TypeKind::ARRAY: + case TypeKind::MAP: + VELOX_NYI( + "computeColumnFlushSizes: unsupported type kind {}", + TypeKindName::toName(colType->kind())); + + case TypeKind::ROW: { + const auto& rowSchema = colType->asRow(); + const int32_t numFields = static_cast(rowSchema.size()); + + // Fixed per-partition overhead: header(7) + numFields(4) + footer: + // numRows(4) + // + sequential offsets 4*(numRows+1) + hasNulls(1) + // + null bitmap for the ROW vector itself if any rows in this partition + // are null. + for (uint32_t p : nonEmptyPartitions) { + const int64_t numRows = rowsPerPartition[p]; + const int64_t rowNullBitmapBytes = + nullCounts[p] > 0 ? bits::nbytes(numRows) : 0; + sizes[p] = 7 + 4 + // "ROW" header + numFields + 4 + 4 * (numRows + 1) + 1 + // footer: numRows + offsets + hasNulls + rowNullBitmapBytes; + } + // Add child column sizes recursively. + for (uint32_t col = 0; col < static_cast(numFields); ++col) { + std::vector childVectors; + childVectors.reserve(columnVectors.size()); + for (const auto& pv : columnVectors) { + childVectors.push_back( + std::dynamic_pointer_cast(pv)->childAt( + col)); + } + const auto childSizes = computeColumnFlushSizes( + childVectors, + rowSchema.childAt(col), + nonEmptyPartitions, + rowsPerPartition, + numPartitions); + for (uint32_t p : nonEmptyPartitions) { + sizes[p] += childSizes[p]; + } + } + break; + } + + default: + VELOX_UNSUPPORTED( + "computeColumnFlushSizes: unsupported type kind {}", + TypeKindName::toName(colType->kind())); + } + return sizes; +} + +} // namespace + +PrestoIterativePartitioningSerializer::PrestoIterativePartitioningSerializer( + RowTypePtr inputType, + uint32_t numPartitions, + const SerdeOpts& opts, + memory::MemoryPool* pool) + : type_(std::move(inputType)), + numPartitions_(numPartitions), + opts_(opts), + pool_(pool), + rowsPerPartition_(numPartitions, 0) { + VELOX_CHECK_GT(numPartitions_, 0); + VELOX_CHECK_NOT_NULL(pool_); + + numColumns_ = type_->size(); +} + +void PrestoIterativePartitioningSerializer::append( + const RowVectorPtr& input, + const std::vector& partitions) { + VELOX_CHECK_NOT_NULL(input); + VELOX_CHECK_EQ( + input->size(), + partitions.size(), + "partitions.size() must equal input->size()"); + + if (input->size() == 0) { + return; + } + + PartitionBuildContext ctx; + auto partitionedRowVector = PartitionedVector::create( + std::static_pointer_cast(input), + partitions, + numPartitions_, + ctx, + pool_); + + const vector_size_t* partitionOffsets = + partitionedRowVector->rawPartitionOffsets(); + vector_size_t prevOffset = 0; + for (uint32_t p = 0; p < numPartitions_; ++p) { + rowsPerPartition_[p] += partitionOffsets[p] - prevOffset; + prevOffset = partitionOffsets[p]; + } + + partitionedRowVectors_.push_back(std::move(partitionedRowVector)); + + bytesBuffered_ += input->retainedSize(); + rowsBuffered_ += static_cast(input->size()); +} + +// --------------------------------------------------------------------------- +// Top-level flush +// --------------------------------------------------------------------------- + +std::map, vector_size_t>> +PrestoIterativePartitioningSerializer::flush() { + auto pages = + (opts_.compressionKind == common::CompressionKind::CompressionKind_NONE) + ? flushUncompressed() + : flushCompressed(); + + partitionedRowVectors_.clear(); + flushSizes_.clear(); + std::fill(rowsPerPartition_.begin(), rowsPerPartition_.end(), 0); + bytesBuffered_ = 0; + rowsBuffered_ = 0; + + return pages; +} + +std::map, vector_size_t>> +PrestoIterativePartitioningSerializer::flushUncompressed() { + if (partitionedRowVectors_.empty()) { + return {}; + } + + const char codecMask = getCodecMarker(); + + // 1. Determine non-empty partitions. + std::vector nonEmptyPartitions; + for (uint32_t p = 0; p < numPartitions_; ++p) { + if (rowsPerPartition_[p] > 0) { + nonEmptyPartitions.push_back(p); + } + } + + // 2. Pre-compute exact byte sizes per top-level column and partition. + const auto& rowSchema = type_->asRow(); + flushSizes_.assign(rowSchema.size(), std::vector(numPartitions_, 0)); + for (uint32_t col = 0; col < rowSchema.size(); ++col) { + std::vector columnVectors; + columnVectors.reserve(partitionedRowVectors_.size()); + for (const auto& pRowVector : partitionedRowVectors_) { + columnVectors.push_back( + std::dynamic_pointer_cast(pRowVector) + ->childAt(col)); + } + flushSizes_[col] = computeColumnFlushSizes( + columnVectors, + rowSchema.childAt(col), + nonEmptyPartitions, + rowsPerPartition_, + numPartitions_); + } + + // 3. Create output streams sized to the exact bytes each partition will need, + // so that the entire payload fits. This avoids multiple resizing and copying. + std::vector> listeners( + numPartitions_); + std::vector> outputStreams(numPartitions_); + std::vector rawOutputStreams(numPartitions_); + std::vector beginStreamPositions(numPartitions_); + + for (uint32_t p : nonEmptyPartitions) { + int64_t initialSize = kHeaderSize + 4; // page header + numCols + for (uint32_t col = 0; col < rowSchema.size(); ++col) { + initialSize += flushSizes_[col][p]; + } + listeners[p] = std::make_unique(); + outputStreams[p] = std::make_unique( + *pool_, listeners[p].get(), initialSize); + rawOutputStreams[p] = outputStreams[p].get(); + beginStreamPositions[p] = outputStreams[p]->tellp(); + + flushStart(*outputStreams[p], rowsPerPartition_[p], codecMask); + + writeInt32(rawOutputStreams[p], static_cast(numColumns_)); + } + + // 4. Flush column data. + flushRowChildren( + partitionedRowVectors_, rowSchema, nonEmptyPartitions, rawOutputStreams); + + // 5. Finalize the page by seeking back to fill in sizes and CRC, and get the + // IOBuf and numOfRows from each stream. + std::map, vector_size_t>> + result; + for (uint32_t p : nonEmptyPartitions) { + const auto uncompressedSize = static_cast( + outputStreams[p]->tellp() - beginStreamPositions[p] - kHeaderSize); + flushFinish( + *outputStreams[p], + rowsPerPartition_[p], + beginStreamPositions[p], + uncompressedSize, + codecMask, + *listeners[p]); + result[p] = + std::make_pair(outputStreams[p]->getIOBuf(), rowsPerPartition_[p]); + } + + return result; +} + +std::map, vector_size_t>> +PrestoIterativePartitioningSerializer::flushCompressed() { + if (partitionedRowVectors_.empty()) { + return {}; + } + + std::vector nonEmptyPartitions; + for (uint32_t p = 0; p < numPartitions_; ++p) { + if (rowsPerPartition_[p] > 0) { + nonEmptyPartitions.push_back(p); + } + } + + const auto& rowSchema = type_->asRow(); + flushSizes_.assign(rowSchema.size(), std::vector(numPartitions_, 0)); + for (uint32_t col = 0; col < rowSchema.size(); ++col) { + std::vector columnVectors; + columnVectors.reserve(partitionedRowVectors_.size()); + for (const auto& pRowVector : partitionedRowVectors_) { + columnVectors.push_back( + std::dynamic_pointer_cast(pRowVector) + ->childAt(col)); + } + flushSizes_[col] = computeColumnFlushSizes( + columnVectors, + rowSchema.childAt(col), + nonEmptyPartitions, + rowsPerPartition_, + numPartitions_); + } + + // Temporary uncompressed payload streams per partition, used as compression + // input before finalize the page. + std::vector> outStreams(numPartitions_); + std::vector rawOutStreams(numPartitions_); + + for (uint32_t p : nonEmptyPartitions) { + int64_t initialSize = 4; // numCols + for (uint32_t col = 0; col < rowSchema.size(); ++col) { + initialSize += flushSizes_[col][p]; + } + outStreams[p] = + std::make_unique(*pool_, nullptr, initialSize); + rawOutStreams[p] = outStreams[p].get(); + writeInt32(rawOutStreams[p], static_cast(numColumns_)); + } + + flushRowChildren( + partitionedRowVectors_, rowSchema, nonEmptyPartitions, rawOutStreams); + + // Create final output streams sized to the exact bytes each partition needs. + std::vector> listeners( + numPartitions_); + std::vector> outputStreams(numPartitions_); + std::vector rawOutputStreams(numPartitions_); + std::vector beginStreamPositions(numPartitions_); + + auto codec = common::compressionKindToCodec(opts_.compressionKind); + std::map, vector_size_t>> + result; + + for (uint32_t p : nonEmptyPartitions) { + const auto uncompressedSize = static_cast(outStreams[p]->tellp()); + VELOX_CHECK_LE( + uncompressedSize, + codec->maxUncompressedLength(), + "UncompressedSize exceeds limit"); + + auto iobuf = outStreams[p]->getIOBuf(); + auto compressedBuffer = codec->compress(iobuf.get()); + const auto compressedSize = + static_cast(compressedBuffer->computeChainDataLength()); + + const bool compressed = + compressedSize <= uncompressedSize * opts_.minCompressionRatio; + const auto serializedSize = compressed ? compressedSize : uncompressedSize; + const char codecMask = + compressed ? (kCompressedBitMask | kCheckSumBitMask) : getCodecMarker(); + const auto& serializedBuffer = compressed ? compressedBuffer : iobuf; + + listeners[p] = std::make_unique(); + outputStreams[p] = std::make_unique( + *pool_, listeners[p].get(), kHeaderSize + serializedSize); + const auto beginOffset = outputStreams[p]->tellp(); + + flushStart(*outputStreams[p], rowsPerPartition_[p], codecMask); + + for (auto range : *serializedBuffer) { + outputStreams[p]->write( + reinterpret_cast(range.data()), range.size()); + } + + flushFinish( + *outputStreams[p], + rowsPerPartition_[p], + beginOffset, + uncompressedSize, + codecMask, + *listeners[p]); + result[p] = + std::make_pair(outputStreams[p]->getIOBuf(), rowsPerPartition_[p]); + } + + return result; +} + +// --------------------------------------------------------------------------- +// Second level functions: start, columns and finish +// --------------------------------------------------------------------------- + +void PrestoIterativePartitioningSerializer::flushStart( + IOBufOutputStream& out, + int32_t numRows, + char codecMask) const { + auto* listener = dynamic_cast(out.listener()); + if (listener) { + listener->pause(); + } + + // Write 21-byte Presto page header; sizes and CRC are filled in later. + char header[kHeaderSize] = {}; + std::memcpy(&header[0], &numRows, 4); + std::memcpy(&header[4], &codecMask, 1); + out.write(header, kHeaderSize); + + if (listener) { + listener->resume(); + } +} + +void PrestoIterativePartitioningSerializer::flushRowChildren( + const std::vector& partitionedVectors, + const RowType& rowSchema, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const { + for (uint32_t col = 0; col < rowSchema.size(); ++col) { + std::vector column; + column.reserve(partitionedVectors.size()); + for (const auto& partitionedVector : partitionedVectors) { + const auto& partitionedRowVector = + std::dynamic_pointer_cast(partitionedVector); + VELOX_DCHECK_NOT_NULL(partitionedRowVector.get()); + column.push_back(partitionedRowVector->childAt(col)); + } + + flushColumn( + column, rowSchema.childAt(col), nonEmptyPartitions, outputStreams); + } +} + +void PrestoIterativePartitioningSerializer::flushFinish( + IOBufOutputStream& out, + int32_t numRows, + std::streampos beginOffset, + int32_t uncompressedSize, + char codecMask, + PrestoOutputStreamListener& listener) const { + listener.pause(); + + const auto endOffset = out.tellp(); + const auto serializedSize = + static_cast(out.tellp() - beginOffset - kHeaderSize); + const int64_t crc = computeChecksum( + listener, static_cast(codecMask), numRows, uncompressedSize); + + out.seekp(beginOffset + kUncompressedSizeOffset); + writeInt32(&out, uncompressedSize); + writeInt32(&out, serializedSize); + writeInt64(&out, crc); + out.seekp(endOffset); +} + +// --------------------------------------------------------------------------- +// Column-level dispatch +// --------------------------------------------------------------------------- + +void PrestoIterativePartitioningSerializer::flushColumn( + const std::vector& partitionedVectors, + const TypePtr& colType, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const { + VELOX_CHECK_GT(partitionedVectors.size(), 0); + + auto typeKind = partitionedVectors[0]->baseVector()->typeKind(); + switch (typeKind) { + case TypeKind::BOOLEAN: + case TypeKind::TINYINT: + case TypeKind::SMALLINT: + case TypeKind::INTEGER: + case TypeKind::BIGINT: + case TypeKind::REAL: + case TypeKind::DOUBLE: + case TypeKind::HUGEINT: + flushSimpleColumn( + partitionedVectors, colType, nonEmptyPartitions, outputStreams); + break; + + case TypeKind::TIMESTAMP: + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: + case TypeKind::ROW: + case TypeKind::ARRAY: + case TypeKind::MAP: + VELOX_NYI(); + + default: + VELOX_UNSUPPORTED( + "Invalid vector encoding for OptimizedPartitionedOutput: ", typeKind); + } +} + +void PrestoIterativePartitioningSerializer::flushSimpleColumn( + const std::vector& partitionedVectors, + const TypePtr& colType, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const { + flushHeader(typeToEncodingName(colType), nonEmptyPartitions, outputStreams); + flushRowCounts(nonEmptyPartitions, outputStreams); + flushNulls(partitionedVectors, nonEmptyPartitions, outputStreams); + + for (size_t i = 0; i < partitionedVectors.size(); i++) { + flushSingleSimpleVector(partitionedVectors[i], outputStreams); + } +} + +template +void PrestoIterativePartitioningSerializer::flushSingleFlatVector( + const PartitionedVectorPtr& partitionedVector, + const std::vector& outputStreams) const { + using T = typename TypeTraits::NativeType; + auto* flatVector = partitionedVector->as>(); + VELOX_DCHECK_NOT_NULL(flatVector); + + const auto* rawValues = + flatVector->baseVector()->template as>()->rawValues(); + const auto* rawNulls = flatVector->baseVector()->rawNulls(); + const auto* partitionOffsets = flatVector->rawPartitionOffsets(); + + flushFlatValues(rawValues, rawNulls, partitionOffsets, outputStreams); +} + +// BOOLEAN columns use kByteArray encoding: FlatVector stores bits +// packed, so rawValues() is unsupported. Each non-null value is written as +// one byte (0x00 or 0x01). +template <> +void PrestoIterativePartitioningSerializer::flushSingleFlatVector< + TypeKind::BOOLEAN>( + const PartitionedVectorPtr& partitionedVector, + const std::vector& outputStreams) const { + auto* flatVector = partitionedVector->as>(); + VELOX_DCHECK_NOT_NULL(flatVector); + + const auto* rawBoolValues = + flatVector->baseVector()->as>()->rawValues(); + const auto* rawNulls = flatVector->baseVector()->rawNulls(); + const auto* partitionOffsets = flatVector->rawPartitionOffsets(); + + // TODO: Improve performance + vector_size_t lastOffset = 0; + for (uint32_t p = 0; p < numPartitions_; ++p) { + const auto offset = partitionOffsets[p]; + const auto numValues = offset - lastOffset; + const auto numNulls = partitionedVector->numNullsAt(p); + if (outputStreams[p] != nullptr && numValues > 0) { + if (numNulls == 0) { + for (vector_size_t i = lastOffset; i < offset; ++i) { + const int8_t val = bits::isBitSet(rawBoolValues, i) ? 1 : 0; + outputStreams[p]->write(reinterpret_cast(&val), 1); + } + } else { + VELOX_DCHECK_NOT_NULL(rawNulls); + for (vector_size_t i = lastOffset; i < offset; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + const int8_t val = bits::isBitSet(rawBoolValues, i) ? 1 : 0; + outputStreams[p]->write(reinterpret_cast(&val), 1); + } + } + } + } + lastOffset = offset; + } +} + +void PrestoIterativePartitioningSerializer::flushSingleSimpleVector( + const PartitionedVectorPtr& partitionedVector, + const std::vector& outputStreams) const { + auto encoding = partitionedVector->baseVector()->encoding(); + auto typeKind = partitionedVector->baseVector()->typeKind(); + + switch (encoding) { + case VectorEncoding::Simple::FLAT: + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + flushSingleFlatVector, typeKind, partitionedVector, outputStreams); + break; + case VectorEncoding::Simple::BIASED: + case VectorEncoding::Simple::CONSTANT: + case VectorEncoding::Simple::DICTIONARY: + case VectorEncoding::Simple::SEQUENCE: + VELOX_NYI( + "Unsupported vector encoding for OptimizedPartitionedOutput: ", + encoding); + default: + VELOX_UNSUPPORTED( + "Invalid vector encoding for OptimizedPartitionedOutput:flushSingleSimpleVector ", + encoding); + } +} + +// --------------------------------------------------------------------------- +// Column building blocks +// --------------------------------------------------------------------------- + +void PrestoIterativePartitioningSerializer::flushHeader( + std::string_view name, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const { + const int32_t nameLen = static_cast(name.size()); + for (uint32_t p : nonEmptyPartitions) { + writeInt32(outputStreams[p], nameLen); + outputStreams[p]->write(name.data(), nameLen); + } +} + +void PrestoIterativePartitioningSerializer::flushRowCounts( + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const { + for (uint32_t p : nonEmptyPartitions) { + writeInt32(outputStreams[p], static_cast(rowsPerPartition_[p])); + } +} + +void PrestoIterativePartitioningSerializer::flushNulls( + const std::vector& partitionedVectors, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const { + std::vector nullCounts(numPartitions_, 0); + for (uint32_t p : nonEmptyPartitions) { + for (const auto& pv : partitionedVectors) { + nullCounts[p] += pv->numNullsAt(p); + } + const char flagByte = nullCounts[p] > 0 ? 1 : 0; + outputStreams[p]->write(&flagByte, 1); + } + + const bool hasAnyNulls = std::any_of( + nonEmptyPartitions.begin(), nonEmptyPartitions.end(), [&](uint32_t p) { + return nullCounts[p] > 0; + }); + if (!hasAnyNulls) { + return; + } + + // Build each partition's null bitmap in a temporary buffer, accumulating + // bits across all batches. Writing via write() correctly handles range + // boundaries in the output stream without requiring seekp(). + // TODO: Avoid this extra memory allocation and copy + std::vector> bitmaps(numPartitions_); + for (uint32_t p : nonEmptyPartitions) { + if (nullCounts[p] > 0) { + bitmaps[p].assign(bits::nbytes(rowsPerPartition_[p]), 0); + } + } + + std::vector destBitOffsets(numPartitions_, 0); + for (const auto& pv : partitionedVectors) { + const uint64_t* rawNulls = pv->baseVector()->rawNulls(); + const auto* partitionOffsets = pv->rawPartitionOffsets(); + + vector_size_t startBit = 0; + for (uint32_t p : nonEmptyPartitions) { + const vector_size_t numBits = partitionOffsets[p] - startBit; + if (rawNulls && numBits > 0 && !bitmaps[p].empty()) { + bits::copyBits( + rawNulls, + startBit, + reinterpret_cast(bitmaps[p].data()), + destBitOffsets[p], + numBits); + } + if (!bitmaps[p].empty()) { + destBitOffsets[p] += numBits; + } + startBit = partitionOffsets[p]; + } + } + + for (uint32_t p : nonEmptyPartitions) { + if (nullCounts[p] == 0) { + continue; + } + + // Convert Velox format (LSB-first, 1=not-null) to Presto wire format + // (MSB-first, 1=null) in-place. + const int32_t numBytes = bits::nbytes(rowsPerPartition_[p]); + for (int32_t i = 0; i < numBytes; ++i) { + bitmaps[p][i] = ~bitmaps[p][i]; + bits::reverseBits(&bitmaps[p][i], 1); + } + + outputStreams[p]->write( + reinterpret_cast(bitmaps[p].data()), numBytes); + } +} + +template +void PrestoIterativePartitioningSerializer::flushFlatValues( + const T* partitionedValues, + const uint64_t* rawNulls, + const vector_size_t* partitionOffsets, + const std::vector& outputStreams) const { + const auto typeWidth = sizeof(T); + vector_size_t lastOffset = 0; + for (uint32_t p = 0; p < numPartitions_; ++p) { + const auto offset = partitionOffsets[p]; + const auto numValues = offset - lastOffset; + if (outputStreams[p] != nullptr && numValues > 0) { + if (!rawNulls) { + outputStreams[p]->write( + reinterpret_cast(&partitionedValues[lastOffset]), + numValues * typeWidth); + } else { + // Presto writes only non-null values; null slots are omitted. + // TODO: Improve performance + for (vector_size_t i = lastOffset; i < offset; ++i) { + if (!bits::isBitNull(rawNulls, i)) { + outputStreams[p]->write( + reinterpret_cast(&partitionedValues[i]), + typeWidth); + } + } + } + } + lastOffset = offset; + } +} + +void PrestoIterativePartitioningSerializer::flushSequentialOffsets( + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const { + for (uint32_t p : nonEmptyPartitions) { + const int32_t numRows = static_cast(rowsPerPartition_[p]); + for (int32_t i = 0; i <= numRows; ++i) { + writeInt32(outputStreams[p], i); + } + } +} + +} // namespace facebook::velox::serializer::presto diff --git a/velox/serializers/PrestoIterativePartitioningSerializer.h b/velox/serializers/PrestoIterativePartitioningSerializer.h new file mode 100644 index 00000000000..9fc0f1d57b7 --- /dev/null +++ b/velox/serializers/PrestoIterativePartitioningSerializer.h @@ -0,0 +1,165 @@ +/* + * Copyright (c) International Business Machines Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include + +#include "velox/common/memory/ByteStream.h" +#include "velox/serializers/PrestoSerializer.h" +#include "velox/type/Type.h" +#include "velox/vector/PartitionedVector.h" + +namespace facebook::velox::serializer::presto { + +/// Convenience alias matching PrestoSerializer.cpp convention. +using SerdeOpts = PrestoVectorSerde::PrestoOptions; + +/// Serializes a stream of RowVectors into per-partition Presto pages. +/// +/// Each call to append() routes rows to their assigned partition. flush() +/// produces one Presto-format IOBuf per non-empty partition and resets the +/// internal state so the serializer can be reused for the next cycle. +class PrestoIterativePartitioningSerializer { + public: + PrestoIterativePartitioningSerializer( + RowTypePtr inputType, + uint32_t numPartitions, + const SerdeOpts& opts, + memory::MemoryPool* pool); + + /// Routes each row in `input` to the partition indicated by `partitions`. + /// `partitions.size()` must equal `input->size()`. + void append( + const RowVectorPtr& input, + const std::vector& partitions); + + /// Serializes all buffered data into one Presto page per non-empty partition + /// and resets internal state. Returns an empty map if nothing has been + /// appended since the last flush. + std::map, vector_size_t>> + flush(); + + /// Returns the total retained bytes of all appended input vectors. + int64_t bytesBuffered() const { + return bytesBuffered_; + } + + /// Returns the total number of rows appended since the last flush. + int64_t rowsBuffered() const { + return rowsBuffered_; + } + + /// Returns the number of rows buffered for the given partition. + /// Must be called before flush(), which resets per-partition counts. + int64_t rowsPerPartition(uint32_t partition) const { + VELOX_DCHECK_LT(partition, numPartitions_); + return rowsPerPartition_[partition]; + } + + private: + std::map, vector_size_t>> + flushUncompressed(); + std::map, vector_size_t>> + flushCompressed(); + + void flushStart(IOBufOutputStream& out, int32_t numRows, char codecMask) + const; + + void flushFinish( + IOBufOutputStream& out, + int32_t numRows, + std::streampos beginOffset, + int32_t uncompressedSize, + char codecMask, + PrestoOutputStreamListener& listener) const; + + void flushRowChildren( + const std::vector& partitionedVectors, + const RowType& rowSchema, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const; + + void flushColumn( + const std::vector& partitionedVectors, + const TypePtr& colType, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const; + + void flushSimpleColumn( + const std::vector& partitionedVectors, + const TypePtr& colType, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const; + + void flushSingleSimpleVector( + const PartitionedVectorPtr& partitionedVector, + const std::vector& outputStreams) const; + + template + void flushSingleFlatVector( + const PartitionedVectorPtr& partitionedVector, + const std::vector& outputStreams) const; + + void flushHeader( + std::string_view name, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const; + + void flushRowCounts( + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const; + + void flushNulls( + const std::vector& partitionedVectors, + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const; + + template + void flushFlatValues( + const T* partitionedValues, + const uint64_t* rawNulls, + const vector_size_t* partitionOffsets, + const std::vector& outputStreams) const; + + void flushSequentialOffsets( + const std::vector& nonEmptyPartitions, + const std::vector& outputStreams) const; + + RowTypePtr type_; + uint32_t numPartitions_; + SerdeOpts opts_; + memory::MemoryPool* pool_; + + /// Cumulative row count per partition across all appended batches. + std::vector rowsPerPartition_; + + /// Number of top-level columns in `type_`. + uint32_t numColumns_{0}; + + std::vector partitionedRowVectors_; + + int64_t bytesBuffered_{0}; + int64_t rowsBuffered_{0}; + + /// Per-column, per-partition exact byte counts computed during flush. + std::vector> flushSizes_; +}; + +} // namespace facebook::velox::serializer::presto diff --git a/velox/serializers/benchmarks/CMakeLists.txt b/velox/serializers/benchmarks/CMakeLists.txt index 7d1044e4367..a81530595e8 100644 --- a/velox/serializers/benchmarks/CMakeLists.txt +++ b/velox/serializers/benchmarks/CMakeLists.txt @@ -21,3 +21,17 @@ target_link_libraries( Folly::folly Folly::follybenchmark ) + +add_executable( + velox_presto_iterative_partitioning_serializer_benchmark + PrestoIterativePartitioningSerializerBenchmark.cpp +) + +target_link_libraries( + velox_presto_iterative_partitioning_serializer_benchmark + velox_presto_serializer + velox_vector_test_lib + velox_memory + Folly::folly + Folly::follybenchmark +) diff --git a/velox/serializers/benchmarks/PrestoIterativePartitioningSerializerBenchmark.cpp b/velox/serializers/benchmarks/PrestoIterativePartitioningSerializerBenchmark.cpp new file mode 100644 index 00000000000..3244281a5dc --- /dev/null +++ b/velox/serializers/benchmarks/PrestoIterativePartitioningSerializerBenchmark.cpp @@ -0,0 +1,177 @@ +/* + * Copyright (c) International Business Machines Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "velox/serializers/PrestoIterativePartitioningSerializer.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::serializer::presto; + +constexpr int64_t kBufferSize = 2 * 1024 * 1024; + +namespace { + +class PrestoIterativePartitioningSerializerBenchmark + : public test::VectorTestBase { + public: + /// Creates a flat vector of type T with deterministic null pattern. + /// Rows where (row % 100) < nullPct are null. + template + VectorPtr makeColumnOfType(vector_size_t size, int32_t nullPct) { + if (nullPct == 0) { + return makeFlatVector( + size, [](auto row) { return static_cast(row); }); + } + return makeFlatVector( + size, + [](auto row) { return static_cast(row); }, + [nullPct](auto row) { return (row % 100) < nullPct; }); + } + + /// Creates a flat vector of the given TypeKind with the given null ratio. + VectorPtr makeColumn(vector_size_t size, TypeKind colKind, int32_t nullPct) { + switch (colKind) { + case TypeKind::BOOLEAN: + return makeColumnOfType(size, nullPct); + case TypeKind::INTEGER: + return makeColumnOfType(size, nullPct); + case TypeKind::BIGINT: + return makeColumnOfType(size, nullPct); + case TypeKind::HUGEINT: + return makeColumnOfType(size, nullPct); + default: + VELOX_UNSUPPORTED( + "Unsupported TypeKind: {}", TypeKindName::toName(colKind)); + } + } + + /// Creates a RowVector with numCols columns of the given TypeKind. + RowVectorPtr makeInput( + vector_size_t size, + TypeKind colKind, + uint32_t numCols, + int32_t nullPct) { + std::vector names; + std::vector children; + names.reserve(numCols); + children.reserve(numCols); + for (uint32_t i = 0; i < numCols; ++i) { + names.push_back(fmt::format("c{}", i)); + children.push_back(makeColumn(size, colKind, nullPct)); + } + return makeRowVector(names, children); + } + + std::vector makePartitions( + vector_size_t size, + uint32_t numPartitions) { + std::vector partitions(size); + for (vector_size_t i = 0; i < size; ++i) { + partitions[i] = i % numPartitions; + } + return partitions; + } + + std::unique_ptr makeSerializer( + const RowTypePtr& type, + uint32_t numPartitions) { + SerdeOpts opts; + return std::make_unique( + type, numPartitions, opts, pool_.get()); + } +}; + +} // namespace + +/// Single benchmark function parameterized by (colKind, numCols, nullPct, +/// numPartitions). Registered via BENCHMARK_NAMED_PARAM below. +/// +/// All runs use 10'000 rows. Setup (input creation, serializer construction, +/// append) is excluded from the measured time. +void benchmarkFlush( + uint32_t /* iters */, + TypeKind colKind, + uint32_t numCols, + int32_t nullPct, + uint32_t numPartitions) { + folly::BenchmarkSuspender suspender; + PrestoIterativePartitioningSerializerBenchmark benchmark; + auto input = benchmark.makeInput(10'000, colKind, numCols, nullPct); + auto parts = benchmark.makePartitions(10'000, numPartitions); + auto serializer = benchmark.makeSerializer( + std::static_pointer_cast(input->type()), numPartitions); + + while (serializer->bytesBuffered() < kBufferSize) { + serializer->append(input, parts); + } + + suspender.dismiss(); + + auto result = serializer->flush(); + folly::doNotOptimizeAway(result); +} + +// clang-format off +// Dimensions: +// col type: {bool, int, bigint, hugeint} +// num cols: {1, 4, 16, 64} +// null pct: {0, 25, 50, 75, 100} +// num partitions: {1, 4, 16, 64, 256, 1024} +// +// Naming: flush__cols_

pct_parts + +#define FLUSH_PARAM(type_name, kind, num_cols, null_pct, num_parts) \ + BENCHMARK_NAMED_PARAM( \ + benchmarkFlush, \ + type_name## _## num_cols## cols_## null_pct## pct_## num_parts## parts, \ + TypeKind::kind, num_cols, null_pct, num_parts) + +#define FLUSH_FOR_PARTS(type_name, kind, num_cols, null_pct) \ + FLUSH_PARAM(type_name, kind, num_cols, null_pct, 1) \ + FLUSH_PARAM(type_name, kind, num_cols, null_pct, 4) \ + FLUSH_PARAM(type_name, kind, num_cols, null_pct, 16) \ + FLUSH_PARAM(type_name, kind, num_cols, null_pct, 64) \ + FLUSH_PARAM(type_name, kind, num_cols, null_pct, 256) \ + FLUSH_PARAM(type_name, kind, num_cols, null_pct, 1024) + +#define FLUSH_FOR_NULLS(type_name, kind, num_cols) \ + FLUSH_FOR_PARTS(type_name, kind, num_cols, 0) \ + FLUSH_FOR_PARTS(type_name, kind, num_cols, 25) \ + FLUSH_FOR_PARTS(type_name, kind, num_cols, 50) \ + FLUSH_FOR_PARTS(type_name, kind, num_cols, 75) \ + FLUSH_FOR_PARTS(type_name, kind, num_cols, 100) + +#define FLUSH_FOR_COLS(type_name, kind) \ + FLUSH_FOR_NULLS(type_name, kind, 1) \ + FLUSH_FOR_NULLS(type_name, kind, 4) \ + FLUSH_FOR_NULLS(type_name, kind, 16) \ + FLUSH_FOR_NULLS(type_name, kind, 64) + +FLUSH_FOR_COLS(bool, BOOLEAN) +FLUSH_FOR_COLS(int, INTEGER) +FLUSH_FOR_COLS(bigint, BIGINT) +FLUSH_FOR_COLS(ldec, HUGEINT) +// clang-format on + +int main(int argc, char** argv) { + folly::Init init{&argc, &argv}; + memory::MemoryManager::initialize(memory::MemoryManager::Options{}); + PrestoVectorSerde::registerVectorSerde(); + folly::runBenchmarks(); + return 0; +} diff --git a/velox/serializers/tests/CMakeLists.txt b/velox/serializers/tests/CMakeLists.txt index d54e9ff9574..66eaa6d5bd4 100644 --- a/velox/serializers/tests/CMakeLists.txt +++ b/velox/serializers/tests/CMakeLists.txt @@ -35,6 +35,7 @@ target_link_libraries( add_executable( velox_serializer_test CompactRowSerializerTest.cpp + PrestoIterativePartitioningSerializerTest.cpp PrestoOutputStreamListenerTest.cpp PrestoSerializerTest.cpp SerializedPageFileTest.cpp @@ -52,6 +53,7 @@ target_link_libraries( velox_row_fast GTest::gtest GTest::gtest_main + GTest::gmock glog::glog ) diff --git a/velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp b/velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp new file mode 100644 index 00000000000..a35c0911615 --- /dev/null +++ b/velox/serializers/tests/PrestoIterativePartitioningSerializerTest.cpp @@ -0,0 +1,756 @@ +/* + * Copyright (c) International Business Machines Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include "velox/serializers/PrestoHeader.h" +#include "velox/serializers/PrestoIterativePartitioningSerializer.h" +#include "velox/serializers/PrestoSerializerDeserializationUtils.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::serializer::presto; +using namespace facebook::velox::test; + +class PrestoIterativePartitioningSerializerTest : public ::testing::Test, + public VectorTestBase { + protected: + static void SetUpTestSuite() { + memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); + if (!isRegisteredVectorSerde()) { + PrestoVectorSerde::registerVectorSerde(); + } + } + + /// Deserializes an IOBuf produced by PartitioningSerializer::flush(). + RowVectorPtr deserialize( + folly::IOBuf& iobuf, + const RowTypePtr& type, + const SerdeOpts* opts = {}) { + auto ranges = byteRangesFromIOBuf(&iobuf); + BufferInputStream stream(std::move(ranges)); + RowVectorPtr result; + serde_.deserialize(&stream, pool_.get(), type, &result, opts); + return result; + } + + /// Extracts flat values from a column into a sorted vector. + template + std::vector sortedValues(const RowVectorPtr& row, int column) { + auto* flat = row->childAt(column)->as>(); + std::vector vals(flat->rawValues(), flat->rawValues() + row->size()); + std::sort(vals.begin(), vals.end()); + return vals; + } + + /// Extracts values from a nullable column, preserving order and nulls. + template + std::vector> nullableValues( + const RowVectorPtr& row, + int column) { + auto* vec = row->childAt(column).get(); + std::vector> result; + result.reserve(row->size()); + for (int i = 0; i < row->size(); ++i) { + if (vec->isNullAt(i)) { + result.push_back(std::nullopt); + } else { + result.push_back(vec->as>()->valueAt(i)); + } + } + return result; + } + + /// Builds a PrestoIterativePartitioningSerializer with default serde options. + std::unique_ptr makeSerializer( + const RowTypePtr& type, + uint32_t numPartitions, + const SerdeOpts& opts = {}) { + return std::make_unique( + type, numPartitions, opts, pool_.get()); + } + + PrestoVectorSerde serde_; +}; + +// ── Routing ────────────────────────────────────────────────────────────────── + +// Single append, two equal-sized partitions. +TEST_F(PrestoIterativePartitioningSerializerTest, basicTwoPartitions) { + auto type = ROW({"a"}, {BIGINT()}); + auto input = + makeRowVector({"a"}, {makeFlatVector({10, 20, 30, 40, 50, 60})}); + + // Even rows → partition 0, odd rows → partition 1. + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 1, 0, 1, 0, 1}); + + EXPECT_EQ(serializer->rowsBuffered(), 6); + + auto ioBufs = serializer->flush(); + ASSERT_EQ(ioBufs.size(), 2); + + EXPECT_EQ(serializer->rowsBuffered(), 0); + EXPECT_EQ(serializer->bytesBuffered(), 0); + + auto p0 = deserialize(*ioBufs.at(0).first, type); + auto p1 = deserialize(*ioBufs.at(1).first, type); + + ASSERT_EQ(p0->size(), 3); + ASSERT_EQ(p1->size(), 3); + + EXPECT_EQ(sortedValues(p0, 0), (std::vector{10, 30, 50})); + EXPECT_EQ(sortedValues(p1, 0), (std::vector{20, 40, 60})); +} + +// All rows routed to one non-zero partition; other partitions are absent. +TEST_F(PrestoIterativePartitioningSerializerTest, allRowsToOnePartition) { + auto type = ROW({"x"}, {INTEGER()}); + auto input = makeRowVector({"x"}, {makeFlatVector({1, 2, 3, 4, 5})}); + + auto serializer = makeSerializer(type, 4); + serializer->append(input, {2, 2, 2, 2, 2}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 1); + ASSERT_TRUE(ioBufs.count(2)); + + auto result = deserialize(*ioBufs.at(2).first, type); + ASSERT_EQ(result->size(), 5); + EXPECT_EQ( + sortedValues(result, 0), (std::vector{1, 2, 3, 4, 5})); +} + +// Single partition (numPartitions=1): all rows go to partition 0. +TEST_F(PrestoIterativePartitioningSerializerTest, singlePartition) { + auto type = ROW({"a"}, {BIGINT()}); + auto input = makeRowVector({"a"}, {makeFlatVector({1, 2, 3, 4, 5})}); + + auto serializer = makeSerializer(type, 1); + serializer->append(input, std::vector(5, 0)); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 1); + auto result = deserialize(*ioBufs.at(0).first, type); + ASSERT_EQ(result->size(), 5); + EXPECT_EQ( + sortedValues(result, 0), (std::vector{1, 2, 3, 4, 5})); +} + +// Multiple append() calls accumulate correctly before flush. +TEST_F(PrestoIterativePartitioningSerializerTest, multipleAppends) { + auto type = ROW({"v"}, {BIGINT()}); + auto serializer = makeSerializer(type, 3); + + serializer->append( + makeRowVector({"v"}, {makeFlatVector({100, 200, 300})}), + {0, 1, 2}); + serializer->append( + makeRowVector({"v"}, {makeFlatVector({400, 500, 600})}), + {2, 0, 1}); + + EXPECT_EQ(serializer->rowsBuffered(), 6); + + auto ioBufs = serializer->flush(); + ASSERT_EQ(ioBufs.size(), 3); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + auto r1 = deserialize(*ioBufs.at(1).first, type); + auto r2 = deserialize(*ioBufs.at(2).first, type); + + ASSERT_EQ(r0->size(), 2); + ASSERT_EQ(r1->size(), 2); + ASSERT_EQ(r2->size(), 2); + + EXPECT_EQ(sortedValues(r0, 0), (std::vector{100, 500})); + EXPECT_EQ(sortedValues(r1, 0), (std::vector{200, 600})); + EXPECT_EQ(sortedValues(r2, 0), (std::vector{300, 400})); +} + +// Multiple columns: each is serialized independently by flushRowChildren. +TEST_F(PrestoIterativePartitioningSerializerTest, multipleColumns) { + auto type = ROW({"a", "b"}, {INTEGER(), BIGINT()}); + auto input = makeRowVector( + {"a", "b"}, + {makeFlatVector({1, 2, 3, 4}), + makeFlatVector({10, 20, 30, 40})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 1, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + ASSERT_EQ(r0->size(), 2); + EXPECT_THAT(sortedValues(r0, 0), testing::ElementsAre(1, 2)); + EXPECT_THAT(sortedValues(r0, 1), testing::ElementsAre(10, 20)); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + ASSERT_EQ(r1->size(), 2); + EXPECT_THAT(sortedValues(r1, 0), testing::ElementsAre(3, 4)); + EXPECT_THAT(sortedValues(r1, 1), testing::ElementsAre(30, 40)); +} + +// ── Empty state +// ─────────────────────────────────────────────────────────────── + +// Flushing an empty serializer returns an empty map. +TEST_F(PrestoIterativePartitioningSerializerTest, flushEmpty) { + auto type = ROW({"a"}, {BIGINT()}); + auto serializer = makeSerializer(type, 3); + EXPECT_TRUE(serializer->flush().empty()); +} + +// Appending an empty RowVector produces no ioBufs on flush. +TEST_F(PrestoIterativePartitioningSerializerTest, appendEmptyVector) { + auto type = ROW({"a"}, {BIGINT()}); + auto serializer = makeSerializer(type, 2); + serializer->append(makeRowVector({"a"}, {makeFlatVector({})}), {}); + EXPECT_TRUE(serializer->flush().empty()); +} + +// ── Null handling +// ───────────────────────────────────────────────────────────── + +// Nulls appear only in one partition; the other partition is null-free. +TEST_F(PrestoIterativePartitioningSerializerTest, nullsInOnePartition) { + auto type = ROW({"a"}, {BIGINT()}); + using opt = std::optional; + // Rows 0,1,2 → partition 0; rows 3,4 → partition 1. + // Partition 0 has a null at position 1; partition 1 has no nulls. + auto input = makeRowVector( + {"a"}, {makeNullableFlatVector({10, std::nullopt, 30, 40, 50})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 0, 1, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + ASSERT_EQ(r0->size(), 3); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre(opt{10}, std::nullopt, opt{30})); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + ASSERT_EQ(r1->size(), 2); + EXPECT_THAT( + nullableValues(r1, 0), testing::ElementsAre(opt{40}, opt{50})); +} + +// Both partitions contain nulls. +TEST_F(PrestoIterativePartitioningSerializerTest, nullsInBothPartitions) { + auto type = ROW({"a"}, {BIGINT()}); + using opt = std::optional; + // Rows 0,1 → partition 0; rows 2,3 → partition 1. + // Partition 0: [10, null]; partition 1: [null, 40]. + auto input = makeRowVector( + {"a"}, + {makeNullableFlatVector({10, std::nullopt, std::nullopt, 40})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 1, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre(opt{10}, std::nullopt)); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + EXPECT_THAT( + nullableValues(r1, 0), + testing::ElementsAre(std::nullopt, opt{40})); +} + +// All rows in one partition are null. +TEST_F(PrestoIterativePartitioningSerializerTest, allNullsInPartition) { + auto type = ROW({"a"}, {BIGINT()}); + using opt = std::optional; + // Partition 0: two nulls. Partition 1: one non-null. + auto input = makeRowVector( + {"a"}, + {makeNullableFlatVector({std::nullopt, std::nullopt, 30})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre(std::nullopt, std::nullopt)); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + EXPECT_THAT(nullableValues(r1, 0), testing::ElementsAre(opt{30})); +} + +// Nulls contributed by different appends to the same partition, exercising +// carry-over between vectors. +TEST_F(PrestoIterativePartitioningSerializerTest, nullsAcrossMultipleAppends) { + auto type = ROW({"a"}, {BIGINT()}); + using opt = std::optional; + auto serializer = makeSerializer(type, 2); + + // Append 1: rows 0,1 → partition 0; row 2 → partition 1. + // Partition 0 gets [10, null]; partition 1 gets [30]. + serializer->append( + makeRowVector( + {"a"}, {makeNullableFlatVector({10, std::nullopt, 30})}), + {0, 0, 1}); + + // Append 2: row 0 → partition 0; row 1 → partition 1. + // Partition 0 gets [null]; partition 1 gets [50]. + serializer->append( + makeRowVector( + {"a"}, {makeNullableFlatVector({std::nullopt, 50})}), + {0, 1}); + + auto ioBufs = serializer->flush(); + ASSERT_EQ(ioBufs.size(), 2); + + // Partition 0 should have [10, null] from append 1 + [null] from append 2. + auto r0 = deserialize(*ioBufs.at(0).first, type); + ASSERT_EQ(r0->size(), 3); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre(opt{10}, std::nullopt, std::nullopt)); + + // Partition 1 should have [30] from append 1 + [50] from append 2. + auto r1 = deserialize(*ioBufs.at(1).first, type); + ASSERT_EQ(r1->size(), 2); + EXPECT_THAT( + nullableValues(r1, 0), testing::ElementsAre(opt{30}, opt{50})); +} + +// Partition boundary falls in the middle of a null-bitmap byte, exercising the +// bit-extraction carry-over logic. +TEST_F(PrestoIterativePartitioningSerializerTest, nullsUnalignedBoundary) { + auto type = ROW({"a"}, {BIGINT()}); + using opt = std::optional; + // 5 rows → partition 0, 4 rows → partition 1. The boundary at bit 5 is + // inside the first byte of the null bitmap. + auto input = makeRowVector( + {"a"}, + {makeNullableFlatVector( + {10, + std::nullopt, + 30, + std::nullopt, + 50, + std::nullopt, + 70, + std::nullopt, + 90})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 0, 0, 0, 1, 1, 1, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + ASSERT_EQ(r0->size(), 5); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre( + opt{10}, std::nullopt, opt{30}, std::nullopt, opt{50})); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + ASSERT_EQ(r1->size(), 4); + EXPECT_THAT( + nullableValues(r1, 0), + testing::ElementsAre(std::nullopt, opt{70}, std::nullopt, opt{90})); +} + +// ── BOOLEAN type +// ────────────────────────────────────────────────────────────── BOOLEAN +// columns use bit-packed FlatVector storage; the serializer writes each +// non-null value as a kByteArray byte (0x00 or 0x01). The tests below verify +// correct encoding, null handling, and partition routing. + +// Two partitions, no nulls: {T,F,T,F,T,F} alternating, even→p0, odd→p1. +TEST_F(PrestoIterativePartitioningSerializerTest, booleanNoNulls) { + auto type = ROW({"b"}, {BOOLEAN()}); + auto input = makeRowVector( + {"b"}, {makeFlatVector({true, false, true, false, true, false})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 1, 0, 1, 0, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + ASSERT_EQ(r0->size(), 3); + EXPECT_THAT( + nullableValues(r0, 0), testing::ElementsAre(true, true, true)); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + ASSERT_EQ(r1->size(), 3); + EXPECT_THAT( + nullableValues(r1, 0), testing::ElementsAre(false, false, false)); +} + +// Nulls mixed into both partitions. +TEST_F(PrestoIterativePartitioningSerializerTest, booleanWithNulls) { + auto type = ROW({"b"}, {BOOLEAN()}); + using opt = std::optional; + // p0: [T, null, F]; p1: [null, T]. + auto input = makeRowVector( + {"b"}, + {makeNullableFlatVector( + {true, std::nullopt, false, std::nullopt, true})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 0, 1, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre(opt{true}, std::nullopt, opt{false})); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + EXPECT_THAT( + nullableValues(r1, 0), + testing::ElementsAre(std::nullopt, opt{true})); +} + +// All values null in one partition; non-null values in the other. +TEST_F(PrestoIterativePartitioningSerializerTest, booleanAllNullsInPartition) { + auto type = ROW({"b"}, {BOOLEAN()}); + using opt = std::optional; + auto input = makeRowVector( + {"b"}, + {makeNullableFlatVector( + {std::nullopt, std::nullopt, true, false})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 1, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre(std::nullopt, std::nullopt)); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + EXPECT_THAT( + nullableValues(r1, 0), testing::ElementsAre(opt{true}, opt{false})); +} + +// Multiple BOOLEAN columns: each is serialized and deserialized independently. +TEST_F(PrestoIterativePartitioningSerializerTest, booleanMultipleColumns) { + auto type = ROW({"x", "y"}, {BOOLEAN(), BOOLEAN()}); + using opt = std::optional; + // 4 rows, 2 partitions. + auto input = makeRowVector( + {"x", "y"}, + {makeNullableFlatVector({true, std::nullopt, false, true}), + makeNullableFlatVector( + {std::nullopt, false, true, std::nullopt})}); + + auto serializer = makeSerializer(type, 2); + serializer->append(input, {0, 0, 1, 1}); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + auto r0 = deserialize(*ioBufs.at(0).first, type); + ASSERT_EQ(r0->size(), 2); + EXPECT_THAT( + nullableValues(r0, 0), + testing::ElementsAre(opt{true}, std::nullopt)); + EXPECT_THAT( + nullableValues(r0, 1), + testing::ElementsAre(std::nullopt, opt{false})); + + auto r1 = deserialize(*ioBufs.at(1).first, type); + ASSERT_EQ(r1->size(), 2); + EXPECT_THAT( + nullableValues(r1, 0), testing::ElementsAre(opt{false}, opt{true})); + EXPECT_THAT( + nullableValues(r1, 1), + testing::ElementsAre(opt{true}, std::nullopt)); +} + +// Verify compressed page header bits and round-trip deserialization. +TEST_F(PrestoIterativePartitioningSerializerTest, compressedRoundTrip) { + constexpr int32_t kNumRows = 5'000; + + SerdeOpts opts; + opts.compressionKind = common::CompressionKind::CompressionKind_ZLIB; + opts.minCompressionRatio = 0.99; + + std::vector values(kNumRows, 7); + std::vector partitions(kNumRows); + for (int32_t i = 0; i < kNumRows; ++i) { + partitions[i] = i % 2; + } + + auto type = ROW({"a", "b"}, {BIGINT(), BIGINT()}); + auto input = makeRowVector( + {"a", "b"}, + {makeFlatVector(values), makeFlatVector(values)}); + + auto serializer = makeSerializer(type, 2, opts); + serializer->append(input, partitions); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 2); + + const std::vector expected(kNumRows / 2, 7); + for (uint32_t partition = 0; partition < 2; ++partition) { + auto ranges = byteRangesFromIOBuf(ioBufs.at(partition).first.get()); + BufferInputStream stream(std::move(ranges)); + auto maybeHeader = serializer::presto::detail::PrestoHeader::read(&stream); + ASSERT_TRUE(maybeHeader.hasValue()); + + const auto header = maybeHeader.value(); + EXPECT_TRUE( + serializer::presto::detail::isCompressedBitSet(header.pageCodecMarker)); + EXPECT_LT(header.compressedSize, header.uncompressedSize); + + auto result = deserialize(*ioBufs.at(partition).first, type, &opts); + ASSERT_EQ(result->size(), kNumRows / 2); + EXPECT_EQ(sortedValues(result, 0), expected); + EXPECT_EQ(sortedValues(result, 1), expected); + } +} + +// ── Lifecycle +// ───────────────────────────────────────────────────────────────── + +// Flush twice: second flush on empty state returns an empty map. +TEST_F(PrestoIterativePartitioningSerializerTest, flushTwice) { + auto type = ROW({"a"}, {BIGINT()}); + auto serializer = makeSerializer(type, 2); + serializer->append( + makeRowVector({"a"}, {makeFlatVector({10, 20})}), {0, 1}); + + auto ioBufs1 = serializer->flush(); + ASSERT_EQ(ioBufs1.size(), 2); + + EXPECT_TRUE(serializer->flush().empty()); +} + +// Append and flush multiple independent cycles. +TEST_F(PrestoIterativePartitioningSerializerTest, multipleCycles) { + auto type = ROW({"a"}, {INTEGER()}); + auto serializer = makeSerializer(type, 2); + + for (int cycle = 0; cycle < 3; ++cycle) { + serializer->append( + makeRowVector( + {"a"}, {makeFlatVector({cycle * 2, cycle * 2 + 1})}), + {0, 1}); + auto ioBufs = serializer->flush(); + ASSERT_EQ(ioBufs.size(), 2) << "cycle " << cycle; + + auto r0 = deserialize(*ioBufs.at(0).first, type); + auto r1 = deserialize(*ioBufs.at(1).first, type); + ASSERT_EQ(r0->size(), 1) << "cycle " << cycle; + ASSERT_EQ(r1->size(), 1) << "cycle " << cycle; + EXPECT_EQ(r0->childAt(0)->as>()->valueAt(0), cycle * 2); + EXPECT_EQ( + r1->childAt(0)->as>()->valueAt(0), cycle * 2 + 1); + } +} + +// ── Scale and regression +// ────────────────────────────────────────────────────── + +// 1024 partitions with random int64 values: verify every value reaches +// exactly the right partition and nothing is lost or duplicated. +TEST_F(PrestoIterativePartitioningSerializerTest, manyPartitionsRandom) { + constexpr uint32_t kNumPartitions = 1024; + constexpr int32_t kNumRows = 64'000; + + std::mt19937_64 rng(42); + std::uniform_int_distribution valueDist; + std::uniform_int_distribution partDist(0, kNumPartitions - 1); + + std::vector inputValues(kNumRows); + std::vector partitions(kNumRows); + // expected[p] holds the sorted values assigned to partition p. + std::vector> expected(kNumPartitions); + + for (int i = 0; i < kNumRows; ++i) { + inputValues[i] = valueDist(rng); + partitions[i] = partDist(rng); + expected[partitions[i]].push_back(inputValues[i]); + } + for (auto& v : expected) { + std::sort(v.begin(), v.end()); + } + + auto type = ROW({"v"}, {BIGINT()}); + auto input = makeRowVector({"v"}, {makeFlatVector(inputValues)}); + + auto serializer = makeSerializer(type, kNumPartitions); + serializer->append(input, partitions); + auto ioBufs = serializer->flush(); + + // Every non-empty partition must have a page; empty partitions must not. + for (uint32_t p = 0; p < kNumPartitions; ++p) { + if (expected[p].empty()) { + EXPECT_EQ(ioBufs.count(p), 0) << "partition " << p; + } else { + ASSERT_EQ(ioBufs.count(p), 1) << "partition " << p; + auto result = deserialize(*ioBufs.at(p).first, type); + ASSERT_EQ(result->size(), static_cast(expected[p].size())) + << "partition " << p; + EXPECT_EQ(sortedValues(result, 0), expected[p]) + << "partition " << p; + } + } +} + +// 1024 partitions with random int64 values and ~25% nulls: verify every +// value and null reaches exactly the right partition in input order, and +// nothing is lost or duplicated. +TEST_F( + PrestoIterativePartitioningSerializerTest, + manyPartitionsRandomWithNulls) { + constexpr uint32_t kNumPartitions = 1024; + constexpr int32_t kNumRows = 64'000; + constexpr int32_t kNullPct = 25; + + std::mt19937_64 rng(43); + std::uniform_int_distribution valueDist; + std::uniform_int_distribution partDist(0, kNumPartitions - 1); + std::uniform_int_distribution nullDist(0, 99); + + std::vector> inputValues(kNumRows); + std::vector partitions(kNumRows); + // expected[p] holds the sequence of (value-or-null) assigned to partition p + // in input order. + std::vector>> expected(kNumPartitions); + + for (int i = 0; i < kNumRows; ++i) { + partitions[i] = partDist(rng); + if (nullDist(rng) < kNullPct) { + inputValues[i] = std::nullopt; + } else { + inputValues[i] = valueDist(rng); + } + expected[partitions[i]].push_back(inputValues[i]); + } + + auto type = ROW({"v"}, {BIGINT()}); + auto input = + makeRowVector({"v"}, {makeNullableFlatVector(inputValues)}); + + auto serializer = makeSerializer(type, kNumPartitions); + serializer->append(input, partitions); + auto ioBufs = serializer->flush(); + + // Partition rearranges values within each partition, so compare sorted. + // std::optional sorts with nullopt < any value, preserving null count. + for (uint32_t p = 0; p < kNumPartitions; ++p) { + if (expected[p].empty()) { + EXPECT_EQ(ioBufs.count(p), 0) << "partition " << p; + } else { + ASSERT_EQ(ioBufs.count(p), 1) << "partition " << p; + auto result = deserialize(*ioBufs.at(p).first, type); + ASSERT_EQ(result->size(), static_cast(expected[p].size())) + << "partition " << p; + + auto expectedSorted = expected[p]; + std::sort(expectedSorted.begin(), expectedSorted.end()); + + auto actual = nullableValues(result, 0); + std::sort(actual.begin(), actual.end()); + + EXPECT_EQ(actual, expectedSorted) << "partition " << p; + } + } +} + +// Regression: flushNulls previously wrote null bitmaps by obtaining a raw +// pointer via writePosition() then advancing the stream via seekp(). This +// assumed the pre-allocated IOBufOutputStream had a single contiguous range, +// but StreamArena::newRange caps each range at the size of one allocator run, +// which can be smaller than the requested size. seekp() then failed because +// the target position exceeded the end of the first (and only) range. +// +// Reproducing condition: 16 columns × 10'000 rows × 50% nulls in one +// partition generates enough output (~100 KB) to trigger the run-size cap. +TEST_F( + PrestoIterativePartitioningSerializerTest, + flushNullsBitmapManyColumnsLargeRowCount) { + constexpr int32_t kNumCols = 16; + constexpr int32_t kNumRows = 10'000; + + std::vector names; + std::vector children; + names.reserve(kNumCols); + children.reserve(kNumCols); + + for (int col = 0; col < kNumCols; ++col) { + names.push_back(fmt::format("c{}", col)); + // Rows where (row % 2 == 0) are null; the rest hold (row * kNumCols + col). + children.push_back( + makeFlatVector( + kNumRows, + [col](auto row) { + return static_cast(row * kNumCols + col); + }, + [](auto row) { return (row % 2) == 0; })); + } + + auto input = makeRowVector(names, children); + auto rowType = std::static_pointer_cast(input->type()); + + auto serializer = makeSerializer(rowType, 1); + serializer->append(input, std::vector(kNumRows, 0)); + auto ioBufs = serializer->flush(); + + ASSERT_EQ(ioBufs.size(), 1); + + auto result = deserialize(*ioBufs.at(0).first, rowType); + ASSERT_EQ(result->size(), kNumRows); + + for (int col = 0; col < kNumCols; ++col) { + auto* flat = result->childAt(col)->as>(); + for (int row = 0; row < kNumRows; ++row) { + if ((row % 2) == 0) { + EXPECT_TRUE(result->childAt(col)->isNullAt(row)) + << "col=" << col << " row=" << row; + } else { + ASSERT_FALSE(result->childAt(col)->isNullAt(row)) + << "col=" << col << " row=" << row; + EXPECT_EQ( + flat->valueAt(row), static_cast(row * kNumCols + col)) + << "col=" << col << " row=" << row; + } + } + } +} diff --git a/velox/vector/CMakeLists.txt b/velox/vector/CMakeLists.txt index 10769cc214d..cc8ddfa8b8f 100644 --- a/velox/vector/CMakeLists.txt +++ b/velox/vector/CMakeLists.txt @@ -21,6 +21,7 @@ velox_add_library( FlatMapVector.cpp FlatVector.cpp LazyVector.cpp + PartitionedVector.cpp SelectivityVector.cpp SequenceVector.cpp SimpleVector.cpp diff --git a/velox/vector/PartitionedVector.cpp b/velox/vector/PartitionedVector.cpp new file mode 100644 index 00000000000..bc83840aa9c --- /dev/null +++ b/velox/vector/PartitionedVector.cpp @@ -0,0 +1,477 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/vector/PartitionedVector.h" + +#include "velox/vector/FlatVector.h" + +namespace facebook::velox { + +using Byte = uint8_t; +using BitIndex = uint8_t; + +namespace { + +inline void countPartitionSizes( + const std::vector& partitions, + vector_size_t* rowCounts) { + VELOX_DCHECK_NOT_NULL(rowCounts); + + for (vector_size_t i = 0; i < partitions.size(); i++) { + rowCounts[partitions[i]]++; + } +} + +inline void prefixSum(vector_size_t* offsets, uint32_t numPartitions) { + for (uint32_t i = 1; i < numPartitions; i++) { + offsets[i] += offsets[i - 1]; + } +} + +inline void calculateOffsets( + const std::vector& partitions, + uint32_t numPartitions, + vector_size_t* endPartitionOffsets) { + VELOX_DCHECK_NOT_NULL(endPartitionOffsets); + + if (numPartitions > 1) { + std::fill_n(endPartitionOffsets, numPartitions, 0); + countPartitionSizes(partitions, endPartitionOffsets); + prefixSum(endPartitionOffsets, numPartitions); + } else { + endPartitionOffsets[0] = static_cast(partitions.size()); + } +} + +// endPartitionOffsets is an array of length numPartitions where each entry i is +// the exclusive end position of partition i. cursorPartitionOffsets is +// initialized such that cursorPartitionOffsets[0] = 0 and for i>0, +// cursorPartitionOffsets[i] = endPartitionOffsets[i-1], i.e., the inclusive +// begin positions. +void initializeCursorPartitionOffsets( + BufferPtr& cursorPartitionOffsets, + const BufferPtr& endPartitionOffsets, + uint32_t numPartitions, + velox::memory::MemoryPool* pool) { + VELOX_DCHECK_NOT_NULL(endPartitionOffsets); + VELOX_DCHECK_EQ( + endPartitionOffsets->size(), numPartitions * sizeof(vector_size_t)); + + ensureCapacity(cursorPartitionOffsets, numPartitions, pool); + cursorPartitionOffsets->asMutable()[0] = 0; + std::memcpy( + &cursorPartitionOffsets->asMutable()[1], + endPartitionOffsets->as(), + sizeof(vector_size_t) * (numPartitions - 1)); + cursorPartitionOffsets->setSize(numPartitions * sizeof(vector_size_t)); +} + +// In-place partitioning algorithm for fixed-width values +// This algorithm rearranges elements so that each element ends up in its target +// partition by repeatedly swapping elements until the current element belongs +// to the current partition +template +void partitionFixedWidthValuesInPlace( + T* values, + const std::vector& partitions, + uint32_t numPartitions, + const BufferPtr& endPartitionOffsets, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { + VELOX_DCHECK_NOT_NULL(values); + VELOX_DCHECK_NOT_NULL(endPartitionOffsets); + initializeCursorPartitionOffsets( + ctx.cursorPartitionOffsets, endPartitionOffsets, numPartitions, pool); + auto* rawCursorOffsets = + ctx.cursorPartitionOffsets->asMutable(); + const auto* rawEndOffsets = endPartitionOffsets->as(); + + for (auto currentPartition = 0; currentPartition < numPartitions; + currentPartition++) { + auto& offset = rawCursorOffsets[currentPartition]; + auto endOffset = rawEndOffsets[currentPartition]; + + while (offset < endOffset) { + uint32_t targetPartition = partitions[offset]; + + while (targetPartition != currentPartition) { + auto destinationOffset = rawCursorOffsets[targetPartition]++; + std::swap(values[destinationOffset], values[offset]); + targetPartition = partitions[destinationOffset]; + } + offset = ++rawCursorOffsets[currentPartition]; + } + } +} + +// Swap two bits between two bytes +void swapBit(Byte& byte1, BitIndex bit1, Byte& byte2, BitIndex bit2) { + // Calculate the difference between the bits + char bitDiff = ((byte1 >> bit1) & 1) ^ ((byte2 >> bit2) & 1); + + // Apply the difference to toggle the bits + byte1 ^= (bitDiff << bit1); + byte2 ^= (bitDiff << bit2); +} + +void partitionBitsInPlace( + Byte* bits, + const std::vector& partitions, + uint32_t numPartitions, + PartitionBuildContext& ctx, + const BufferPtr& endPartitionOffsets, + velox::memory::MemoryPool* pool) { + initializeCursorPartitionOffsets( + ctx.cursorPartitionOffsets, endPartitionOffsets, numPartitions, pool); + + auto* rawCursorOffsets = + ctx.cursorPartitionOffsets->asMutable(); + const auto* rawEndOffsets = endPartitionOffsets->as(); + + for (uint32_t partition = 0; partition < numPartitions; partition++) { + auto& offset = rawCursorOffsets[partition]; + auto endOffset = rawEndOffsets[partition]; + while (offset < endOffset) { + uint32_t p = partitions[offset]; + while (p != partition) { + vector_size_t destinationOffset = rawCursorOffsets[p]++; + + // Calculate the byte address and bit index within the byte for the + // source and destination bits. Since each byte contains 8 bits, we + // divide the offset by 8 to get the byte address and take the modulus + // by 8 to get the bit index within that byte. + vector_size_t destinationAddr = destinationOffset >> 3; + int8_t destinationBitInByte = destinationOffset & 7; + vector_size_t fromAddr = offset >> 3; + int8_t fromBitInByte = offset & 7; + + swapBit( + bits[destinationAddr], + destinationBitInByte, + bits[fromAddr], + fromBitInByte); + p = partitions[destinationOffset]; + } + offset = ++rawCursorOffsets[partition]; + } + } +} + +template +void partitionFixedWidthValues( + BufferPtr& inputBuffer, + const std::vector& partitions, + const BufferPtr& endPartitionOffsets, + uint32_t numPartitions, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { + VELOX_DCHECK_NOT_NULL(inputBuffer); + + auto input = inputBuffer->asMutable(); + partitionFixedWidthValuesInPlace( + input, partitions, numPartitions, endPartitionOffsets, ctx, pool); +} + +template <> +void partitionFixedWidthValues( + BufferPtr& inputBuffer, + const std::vector& partitions, + const BufferPtr& endPartitionOffsets, + uint32_t numPartitions, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { + VELOX_DCHECK_NOT_NULL(inputBuffer); + + auto input = inputBuffer->asMutable(); + partitionBitsInPlace( + input, partitions, numPartitions, ctx, endPartitionOffsets, pool); +} + +template +PartitionedVectorPtr createPartitionedFlatVector( + VectorPtr vector, + const std::vector& partitions, + uint32_t numPartitions, + const BufferPtr& endPartitionOffsets, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { + using T = typename TypeTraits::NativeType; + auto flatVector = std::dynamic_pointer_cast>(vector); + VELOX_CHECK_NOT_NULL(flatVector); + + auto partitionedFlatVector = std::make_shared>( + flatVector, numPartitions, endPartitionOffsets, pool); + + // Always call partition() so that numNullsPerPartition_ is populated, + // even when numPartitions == 1 and no data movement is required. + partitionedFlatVector->partition(partitions, ctx); + + return partitionedFlatVector; +} + +PartitionedVectorPtr createPartitionedRowVector( + VectorPtr vector, + const std::vector& partitions, + uint32_t numPartitions, + const BufferPtr& endPartitionOffsets, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { + auto rowVector = std::dynamic_pointer_cast(vector); + VELOX_CHECK_NOT_NULL(rowVector); + + auto partitionedRowVector = std::make_shared( + rowVector, numPartitions, endPartitionOffsets, pool); + + // Always call partition() to initialize partitionedChildren_, even when + // numPartitions == 1, so that partitionAt() can reconstruct the RowVector. + partitionedRowVector->partition(partitions, ctx); + + return partitionedRowVector; +} + +} // namespace + +PartitionedVector::~PartitionedVector() = default; + +PartitionedVectorPtr PartitionedVector::create( + const VectorPtr& vector, + const std::vector& partitions, + uint32_t numPartitions, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { + VELOX_CHECK_NOT_NULL(vector); + VELOX_CHECK_EQ(vector->size(), partitions.size()); + VELOX_CHECK_GT(numPartitions, 0); + VELOX_CHECK_NOT_NULL(pool); + + // Calculate the end offsets for each partition. For example, if there are 3 + // partitions with 2, 3, and 1 rows respectively, then endPartitionOffsets[0] + // = 2, endPartitionOffsets[1] = 5, and endPartitionOffsets[2] = 6. + BufferPtr endPartitionOffsets; + ensureCapacity(endPartitionOffsets, numPartitions, pool); + calculateOffsets( + partitions, + numPartitions, + endPartitionOffsets->asMutable()); + endPartitionOffsets->setSize(numPartitions * sizeof(vector_size_t)); + + auto raw = endPartitionOffsets->as(); + VELOX_DCHECK_EQ(raw[numPartitions - 1], partitions.size()); + + return create( + vector, partitions, numPartitions, endPartitionOffsets, ctx, pool); +} + +PartitionedVectorPtr PartitionedVector::create( + const VectorPtr& vector, + const std::vector& partitions, + uint32_t numPartitions, + const BufferPtr& endPartitionOffsets, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool) { + VELOX_CHECK_NOT_NULL(endPartitionOffsets); + VELOX_CHECK_EQ( + endPartitionOffsets->size(), numPartitions * sizeof(vector_size_t)); + + auto encoding = vector->encoding(); + auto typeKind = vector->typeKind(); + + switch (encoding) { + case VectorEncoding::Simple::FLAT: { + auto partitionedFlatVector = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH( + createPartitionedFlatVector, + typeKind, + vector, + partitions, + numPartitions, + endPartitionOffsets, + ctx, + pool); + return partitionedFlatVector; + } + + case VectorEncoding::Simple::ROW: { + return createPartitionedRowVector( + vector, partitions, numPartitions, endPartitionOffsets, ctx, pool); + } + + case VectorEncoding::Simple::CONSTANT: { + return std::make_shared( + vector, numPartitions, endPartitionOffsets, pool); + } + + case VectorEncoding::Simple::ARRAY: + case VectorEncoding::Simple::MAP: + case VectorEncoding::Simple::DICTIONARY: + case VectorEncoding::Simple::BIASED: + case VectorEncoding::Simple::SEQUENCE: + case VectorEncoding::Simple::LAZY: + VELOX_UNSUPPORTED( + "Unsupported vector encoding for PartitionedVector: {}", + mapSimpleToName(encoding)); + default: + VELOX_UNREACHABLE( + "Invalid vector encoding for PartitionedVector: {}", encoding); + } +} + +VectorPtr PartitionedVector::baseVector() const { + return vector_; +} + +std::string PartitionedVector::toString() const { + std::string offsets; + for (vector_size_t i = 0; i < numPartitions_; ++i) { + if (i > 0) { + offsets += ','; + } + offsets += fmt::format("{}", rawEndPartitionOffsets_[i]); + } + + return fmt::format( + "PartitionedVector[numPartitions: {}, offsets: {}]", + numPartitions_, + offsets); +} + +template +void PartitionedFlatVector::partition( + const std::vector& partitions, + PartitionBuildContext& ctx) { + if (vector_->rawNulls()) { + Byte* rawNulls = reinterpret_cast(vector_->mutableRawNulls()); + partitionBitsInPlace( + rawNulls, partitions, numPartitions_, ctx, endPartitionOffsets_, pool_); + } + + auto valuesBuffer = vector_->as>()->values(); + partitionFixedWidthValues( + valuesBuffer, + partitions, + endPartitionOffsets_, + numPartitions_, + ctx, + pool_); + + // Count nulls per partition from the now-partitioned null bitmap. + if (const uint64_t* rawNulls = vector_->rawNulls()) { + for (uint32_t p = 0; p < numPartitions_; ++p) { + const vector_size_t begin = p == 0 ? 0 : rawEndPartitionOffsets_[p - 1]; + const vector_size_t end = rawEndPartitionOffsets_[p]; + if (begin < end) { + numNullsPerPartition_[p] = + static_cast(bits::countNulls(rawNulls, begin, end)); + } + } + } +} + +template +VectorPtr PartitionedFlatVector::partitionAt(uint32_t partition) const { + VELOX_CHECK_LT(partition, numPartitions_); + + vector_size_t beginOffset = + partition == 0 ? 0 : rawEndPartitionOffsets_[partition - 1]; + vector_size_t numRowsInPartition = + rawEndPartitionOffsets_[partition] - beginOffset; + + return vector_->slice(beginOffset, numRowsInPartition); +} + +void PartitionedRowVector::partition( + const std::vector& partitions, + PartitionBuildContext& ctx) { + auto* rowVector = vector_->as(); + partitionedChildren_.reserve(rowVector->childrenSize()); + + for (const auto& child : rowVector->children()) { + partitionedChildren_.push_back( + PartitionedVector::create( + child, + partitions, + numPartitions_, + endPartitionOffsets_, + ctx, + pool_)); + } + + if (numPartitions_ > 1 && vector_->rawNulls()) { + Byte* rawNulls = reinterpret_cast(vector_->mutableRawNulls()); + partitionBitsInPlace( + rawNulls, partitions, numPartitions_, ctx, endPartitionOffsets_, pool_); + } + + // Count nulls per partition from the now-partitioned null bitmap. + if (const uint64_t* rawNulls = vector_->rawNulls()) { + for (uint32_t p = 0; p < numPartitions_; ++p) { + const vector_size_t begin = p == 0 ? 0 : rawEndPartitionOffsets_[p - 1]; + const vector_size_t end = rawEndPartitionOffsets_[p]; + if (begin < end) { + numNullsPerPartition_[p] = + static_cast(bits::countNulls(rawNulls, begin, end)); + } + } + } +} + +VectorPtr PartitionedRowVector::partitionAt(uint32_t partition) const { + VELOX_CHECK_LT(partition, numPartitions_); + + vector_size_t beginOffset = + partition == 0 ? 0 : rawEndPartitionOffsets_[partition - 1]; + vector_size_t numRowsInPartition = + rawEndPartitionOffsets_[partition] - beginOffset; + + std::vector children; + children.reserve(partitionedChildren_.size()); + for (const auto& child : partitionedChildren_) { + children.push_back(child->partitionAt(partition)); + } + + BufferPtr nulls = nullptr; + if (numRowsInPartition > 0 && vector_->rawNulls()) { + nulls = AlignedBuffer::allocate(numRowsInPartition, pool_); + bits::copyBits( + vector_->rawNulls(), + beginOffset, + nulls->asMutable(), + 0, + numRowsInPartition); + } + + return std::make_shared( + pool_, + vector_->type(), + std::move(nulls), + numRowsInPartition, + std::move(children)); +} + +void PartitionedConstantVector::partition( + const std::vector& /*partitions*/, + PartitionBuildContext& /*ctx*/) {} + +VectorPtr PartitionedConstantVector::partitionAt(uint32_t partition) const { + VELOX_CHECK_LT(partition, numPartitions_); + + const vector_size_t beginOffset = + partition == 0 ? 0 : rawEndPartitionOffsets_[partition - 1]; + const vector_size_t numRowsInPartition = + rawEndPartitionOffsets_[partition] - beginOffset; + + return vector_->slice(0, numRowsInPartition); +} + +} // namespace facebook::velox diff --git a/velox/vector/PartitionedVector.h b/velox/vector/PartitionedVector.h new file mode 100644 index 00000000000..eb008f1193b --- /dev/null +++ b/velox/vector/PartitionedVector.h @@ -0,0 +1,314 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "velox/vector/BaseVector.h" +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox { + +class PartitionedVector; +using PartitionedVectorPtr = std::shared_ptr; + +namespace { + +// TODO: This was copied from dwio::common::BufferUtil.h. However the vector +// module should not depend on dwio. Move this to a common place +template +inline void ensureCapacity( + BufferPtr& data, + size_t numElements, + velox::memory::MemoryPool* pool, + bool preserveOldData = false, + bool clearBits = false) { + size_t oldSize = 0; + size_t newCapacity = BaseVector::byteSize(numElements); + if (!data) { + data = AlignedBuffer::allocate(numElements, pool); + } else { + oldSize = data->size(); + if (!data->isMutable() || data->capacity() < newCapacity) { + auto newData = AlignedBuffer::allocate(numElements, pool); + if (preserveOldData) { + std::memcpy( + newData->template asMutable(), + data->as(), + oldSize); + } + data = newData; + } + } + + if (clearBits && newCapacity > oldSize) { + std::memset( + (void*)(data->asMutable() + oldSize), + 0L, + newCapacity - oldSize); + } +} + +} // namespace + +/// Construction-time context used to build a PartitionedVector. +/// +/// This struct contains only transient execution context needed during +/// construction. None of the fields here define the logical state of +/// PartitionedVector and none are retained after create(). +/// All fields are only valid during the PartitionedVector::create() call. +struct PartitionBuildContext { + BufferPtr cursorPartitionOffsets = nullptr; + + PartitionBuildContext() = default; +}; + +/// PartitionedVector provides an in-place, partition-aware layout of a vector +/// based on per-row partition IDs. +/// +/// This is a low-level execution abstraction, analogous to DecodedVector: +/// - it owns partitioning metadata (offsets, indices) +/// - it does not encode operator-specific semantics +/// - it is intended to be reused by multiple exec components +/// (aggregation, sorting, shuffle, etc.) +/// +/// The partitioning operation rearranges rows so that rows belonging to the +/// same partition occupy a contiguous range. +/// +/// Thread-safety: +/// This class is NOT thread-safe. All methods must be called from a single +/// thread. Internal buffers are mutated during create(). +class PartitionedVector { + public: + /// Disable default constructor. + PartitionedVector() = delete; + + /// Disable copy constructor and assignment. + PartitionedVector(const PartitionedVector& other) = delete; + PartitionedVector& operator=(const PartitionedVector& other) = delete; + + // Use default move constructor and move assignment operator. + PartitionedVector(PartitionedVector&&) noexcept = default; + PartitionedVector& operator=(PartitionedVector&&) noexcept = default; + + /// Virtual destructor. + virtual ~PartitionedVector(); + + /// Factory method to create a PartitionedVector. This is the main entry point + /// for constructing a PartitionedVector. The partitioning operation + /// rearranges rows in the base vector so that rows belonging to the same + /// partition occupy a contiguous range. + /// + /// Params: + /// - vector: the base vector to be partitioned. This is modified during + /// partitioning, and becomes the underlying vector of the created + /// PartitionedVector. + /// - partitions: a vector of partition IDs for each row in the base vector. + /// The length of this vector must be the same as the number of rows in the + /// base vector. Each entry must be a value between 0 and numPartitions - 1. + /// - numPartitions: the total number of partitions. This must be greater than + /// 0. + /// - ctx: the context object for building the partitioned vector. This + /// contains transient execution context needed during construction, such as + /// intermediate buffers. None of the fields in this context define the + /// logical state of the PartitionedVector, and none are retained after + /// create(). All fields in this context are only valid during the create() + /// call. + /// - pool: the memory pool for allocating any necessary buffers during the + /// creation of the PartitionedVector. + static PartitionedVectorPtr create( + const VectorPtr& vector, + const std::vector& partitions, + uint32_t numPartitions, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool); + + /// Returns the underlying vector. + VectorPtr baseVector() const; + + /// Returns the partitioned vector at partition p. If the number of rows in + /// that partition is 0, returns an empty vector. + virtual VectorPtr partitionAt(uint32_t partition) const = 0; + + template + T* as() { + static_assert(std::is_base_of_v); + return dynamic_cast(this); + } + + /// Returns the number of null rows in the given partition. + vector_size_t numNullsAt(uint32_t partition) const { + VELOX_DCHECK_LT(partition, numPartitions_); + return numNullsPerPartition_[partition]; + } + + TypeKind typeKind() const { + return vector_->typeKind(); + } + + vector_size_t* rawPartitionOffsets() { + return rawEndPartitionOffsets_; + } + + virtual const vector_size_t* rawSizes() = 0; + + /// Returns string representation of the value in the specified row. + virtual std::string toString() const; + + protected: + // Internal create method that accepts pre-computed endPartitionOffsets + // buffer. + static PartitionedVectorPtr create( + const VectorPtr& vector, + const std::vector& partitions, + uint32_t numPartitions, + const BufferPtr& partitionOffsetsBuffer, + PartitionBuildContext& ctx, + velox::memory::MemoryPool* pool); + + PartitionedVector( + const VectorPtr& vector, + uint32_t numPartitions, + const BufferPtr& endPartitionOffsets, + velox::memory::MemoryPool* pool) + : vector_(vector), + numPartitions_(numPartitions), + endPartitionOffsets_(endPartitionOffsets), + numNullsPerPartition_(numPartitions, 0), + pool_(pool) { + VELOX_CHECK_NOT_NULL(vector_); + VELOX_CHECK_GT(numPartitions_, 0); + VELOX_CHECK_NOT_NULL(endPartitionOffsets_); + VELOX_CHECK_EQ( + endPartitionOffsets_->size(), numPartitions_ * sizeof(vector_size_t)); + VELOX_CHECK_NOT_NULL(pool_); + + rawEndPartitionOffsets_ = endPartitionOffsets_->asMutable(); + } + + virtual void partition( + const std::vector& partitions, + PartitionBuildContext& ctx) = 0; + + // The base vector that is being partitioned. This is modified during + // partitioning. + VectorPtr vector_; + + // Total number of partitions. This is set at construction and does not change + // during partitioning. It doesn't have const quantifier because we want to + // allow move assignment operator. + uint32_t numPartitions_; + + // The cumulative end row offsets for each partition. For example, if there + // are 3 partitions with 2, 3, and 1 rows respectively, then + // endPartitionOffsets_[0] = 2, endPartitionOffsets_[1] = 5, and + // endPartitionOffsets_[2] = 6. + BufferPtr endPartitionOffsets_; + + // The raw pointer to the endPartitionOffsets_ buffer for easy access during + // partitioning. + vector_size_t* rawEndPartitionOffsets_; + + /// Null row counts per partition, computed during partition(). + std::vector numNullsPerPartition_; + + velox::memory::MemoryPool* pool_; +}; + +using PartitionedVectorPtr = std::shared_ptr; + +template +class PartitionedFlatVector : public PartitionedVector { + public: + PartitionedFlatVector( + const VectorPtr& flatVector, + uint32_t numPartitions, + const BufferPtr& partitionOffsets, + velox::memory::MemoryPool* pool) + : PartitionedVector(flatVector, numPartitions, partitionOffsets, pool) {} + + void partition( + const std::vector& partitions, + PartitionBuildContext& ctx) override; + + VectorPtr partitionAt(uint32_t partition) const override; + + const vector_size_t* rawSizes() override { + VELOX_UNREACHABLE("PartitionedFlatVector does not implement rawSizes()"); + } +}; + +/// Partitions a RowVector in-place so that rows belonging to the same +/// partition occupy a contiguous range. Recursively partitions each child +/// column using PartitionedVector. +class PartitionedRowVector : public PartitionedVector { + public: + PartitionedRowVector( + const VectorPtr& rowVector, + uint32_t numPartitions, + const BufferPtr& partitionOffsets, + velox::memory::MemoryPool* pool) + : PartitionedVector(rowVector, numPartitions, partitionOffsets, pool) {} + + void partition( + const std::vector& partitions, + PartitionBuildContext& ctx) override; + + VectorPtr partitionAt(uint32_t partition) const override; + + /// Returns the partitioned child vector at the given column index. + PartitionedVectorPtr childAt(uint32_t col) const { + VELOX_DCHECK_LT(col, partitionedChildren_.size()); + return partitionedChildren_[col]; + } + + const vector_size_t* rawSizes() override { + VELOX_UNREACHABLE("PartitionedRowVector does not implement rawSizes()"); + } + + private: + /// Partitioned child columns, one per child of the underlying RowVector. + std::vector partitionedChildren_; +}; + +/// Partitions a ConstantVector by reusing the same constant payload and +/// returning constant slices sized to each partition. +class PartitionedConstantVector : public PartitionedVector { + public: + PartitionedConstantVector( + const VectorPtr& constantVector, + uint32_t numPartitions, + const BufferPtr& partitionOffsets, + velox::memory::MemoryPool* pool) + : PartitionedVector( + constantVector, + numPartitions, + partitionOffsets, + pool) {} + + void partition( + const std::vector& partitions, + PartitionBuildContext& ctx) override; + + VectorPtr partitionAt(uint32_t partition) const override; + + const vector_size_t* rawSizes() override { + VELOX_UNREACHABLE( + "PartitionedConstantVector does not implement rawSizes()"); + } +}; + +} // namespace facebook::velox diff --git a/velox/vector/benchmarks/CMakeLists.txt b/velox/vector/benchmarks/CMakeLists.txt index 0cb3c78bfd8..8c1840daa1b 100644 --- a/velox/vector/benchmarks/CMakeLists.txt +++ b/velox/vector/benchmarks/CMakeLists.txt @@ -45,3 +45,13 @@ target_link_libraries( gflags::gflags glog::glog ) + +add_executable(velox_vector_partitioned_vector_benchmark PartitionedVectorBenchmark.cpp) +target_link_libraries( + velox_vector_partitioned_vector_benchmark + velox_dwio_common_test_utils + velox_vector + velox_vector_test_lib + Folly::folly + Folly::follybenchmark +) diff --git a/velox/vector/benchmarks/PartitionedVectorBenchmark.cpp b/velox/vector/benchmarks/PartitionedVectorBenchmark.cpp new file mode 100644 index 00000000000..681a2e0c188 --- /dev/null +++ b/velox/vector/benchmarks/PartitionedVectorBenchmark.cpp @@ -0,0 +1,184 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include + +#include "dwio/common/tests/utils/BatchMaker.h" +#include "vector/PartitionedVector.h" + +using namespace facebook::velox; +using namespace facebook::velox::test; + +namespace facebook::velox::test { + +namespace { + +thread_local auto gen = std::mt19937(42); + +auto noNulls = [](vector_size_t) { return false; }; + +auto allNulls = [](vector_size_t) { return true; }; + +auto halfNulls = [](vector_size_t row) { return row % 2 == 0; }; + +template +RowTypePtr scalarTypeGenerator(int32_t numColumns) { + return ROW(std::vector(numColumns, createScalarType())); +} + +RowTypePtr dateTypeGenerator(int32_t numColumns) { + return ROW(std::vector(numColumns, DATE())); +} + +RowTypePtr shortDecimalTypeGenerator(int32_t numColumns) { + return ROW(std::vector(numColumns, DECIMAL(10, 2))); +} + +RowTypePtr longDecimalTypeGenerator(int32_t numColumns) { + return ROW(std::vector(numColumns, DECIMAL(20, 3))); +} + +RowTypePtr mixedFlatTypeGenerator(int32_t numColumns) { + const std::vector typeSelection = { + BOOLEAN(), + TINYINT(), + SMALLINT(), + INTEGER(), + BIGINT(), + HUGEINT(), + REAL(), + DOUBLE(), + TIMESTAMP(), + DATE(), + DECIMAL(10, 2), + DECIMAL(20, 3), + }; + + std::vector types; + types.reserve(numColumns); + + for (int i = 0; i < numColumns; ++i) { + types.push_back(typeSelection[i % typeSelection.size()]); + } + + std::ranges::shuffle(types, gen); + + return ROW(std::move(types)); +} + +auto randomPartitionFunction = [](const RowVectorPtr& vector, + uint32_t numPartitions, + std::vector& partitions) { + partitions.resize(vector->size()); + for (int i = 0; i < vector->size(); ++i) { + partitions[i] = gen() % numPartitions; + } +}; + +std::shared_ptr pool; +std::vector partitions; + +RowVectorPtr createTestVector( + const std::function& rowTypeGenerator, + vector_size_t numRows, + int32_t numColumns, + const std::function& isNullAt) { + auto rowType = rowTypeGenerator(numColumns); + const auto batch = BatchMaker::createBatch(rowType, numRows, *pool, isNullAt); + return std::static_pointer_cast(batch); +} + +} // namespace + +void runBM( + uint32_t iterations, + const std::function& rowTypeGenerator, + int32_t numColumns, + uint32_t numPartitions, + const std::function& isNullAt = noNulls, + vector_size_t numRows = 10000) { + folly::BenchmarkSuspender suspender; + PartitionBuildContext ctx; + auto vector = + createTestVector(rowTypeGenerator, numRows, numColumns, isNullAt); + randomPartitionFunction(vector, numPartitions, partitions); + for (uint32_t i = 0; i < iterations; ++i) { + // PartitionedVector::create mutates its input, so each iteration needs a + // fresh copy to keep inputs consistent. + const auto vectorCopy = std::static_pointer_cast( + BaseVector::copy(*vector, pool.get())); + suspender.dismiss(); + PartitionedVector::create( + vectorCopy, partitions, numPartitions, ctx, pool.get()); + suspender.rehire(); + } +} + +#define BENCHMARK_CONFIG(name, generator, numCols, nulls, numParts) \ + BENCHMARK_NAMED_PARAM( \ + runBM, \ + name##_##numCols##Cols_##nulls##_P##numParts, \ + generator, \ + numCols, \ + numParts, \ + nulls); + +#define BENCHMARK_PARTITIONS(name, generator, numCols, nulls) \ + BENCHMARK_CONFIG(name, generator, numCols, nulls, 4) \ + BENCHMARK_CONFIG(name, generator, numCols, nulls, 16) \ + BENCHMARK_CONFIG(name, generator, numCols, nulls, 64) \ + BENCHMARK_CONFIG(name, generator, numCols, nulls, 256) \ + BENCHMARK_CONFIG(name, generator, numCols, nulls, 1024) + +#define BENCHMARK_SIZES(name, generator, nulls) \ + BENCHMARK_PARTITIONS(name, generator, 1, nulls) \ + BENCHMARK_PARTITIONS(name, generator, 10, nulls) \ + BENCHMARK_PARTITIONS(name, generator, 100, nulls) \ + BENCHMARK_PARTITIONS(name, generator, 1000, nulls) + +#define BENCHMARK_TYPE(name, generator) \ + BENCHMARK_SIZES(name, generator, noNulls) \ + BENCHMARK_SIZES(name, generator, allNulls) \ + BENCHMARK_SIZES(name, generator, halfNulls) + +BENCHMARK_TYPE(BOOLEAN, scalarTypeGenerator); +BENCHMARK_TYPE(SMALLINT, scalarTypeGenerator); +BENCHMARK_TYPE(INTEGER, scalarTypeGenerator); +BENCHMARK_TYPE(BIGINT, scalarTypeGenerator); +BENCHMARK_TYPE(HUGEINT, scalarTypeGenerator); +BENCHMARK_TYPE(REAL, scalarTypeGenerator); +BENCHMARK_TYPE(DOUBLE, scalarTypeGenerator); +BENCHMARK_TYPE(TIMESTAMP, scalarTypeGenerator); +BENCHMARK_TYPE(VARCHAR, scalarTypeGenerator); +BENCHMARK_TYPE(VARBINARY, scalarTypeGenerator); +BENCHMARK_TYPE(DATE, dateTypeGenerator); +BENCHMARK_TYPE(ShortDecimal, shortDecimalTypeGenerator); +BENCHMARK_TYPE(LongDecimal, longDecimalTypeGenerator); +BENCHMARK_TYPE(Mixed, mixedFlatTypeGenerator); + +} // namespace facebook::velox::test + +int main(int argc, char** argv) { + folly::Init init{&argc, &argv}; + memory::MemoryManager::initialize(memory::MemoryManager::Options{}); + pool = memory::memoryManager()->addLeafPool(); + folly::runBenchmarks(); + return 0; +} diff --git a/velox/vector/tests/CMakeLists.txt b/velox/vector/tests/CMakeLists.txt index 5eb6cf593b6..074530b31fc 100644 --- a/velox/vector/tests/CMakeLists.txt +++ b/velox/vector/tests/CMakeLists.txt @@ -24,6 +24,7 @@ add_executable( IsWritableVectorTest.cpp LazyVectorTest.cpp MayHaveNullsRecursiveTest.cpp + PartitionedVectorTest.cpp SelectivityVectorTest.cpp StringVectorBufferTest.cpp VariantToVectorTest.cpp diff --git a/velox/vector/tests/PartitionedVectorTest.cpp b/velox/vector/tests/PartitionedVectorTest.cpp new file mode 100644 index 00000000000..569a6e6ae9f --- /dev/null +++ b/velox/vector/tests/PartitionedVectorTest.cpp @@ -0,0 +1,416 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include + +#include "vector/tests/utils/VectorTestBase.h" +#include "velox/vector/PartitionedVector.h" +#include "velox/vector/tests/utils/PartitionedVectorTestBase.h" + +namespace facebook::velox::test { + +class PartitioningVectorTest : public testing::TestWithParam, + public test::PartitionedVectorTestBase { + protected: + std::mt19937 gen_ = std::mt19937(std::random_device{}()); + + PartitionBuildContext ctx_; + BufferPtr partitionOffsets_; + + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + void testPartitionedVector( + VectorPtr vector, + const std::vector& partitions, + uint32_t numPartitions) { + // Back up the vector before calling PartitionedVector::create() + VectorPtr vectorCopy = BaseVector::copy(*vector); + // Build the expected vector using the reference implementation + std::vector expectedVectors = + partitionVectorByWrapping(vectorCopy, partitions, numPartitions); + + // Initialize buffers needed for PartitionedVector::create() + ensureCapacity( + ctx_.cursorPartitionOffsets, numPartitions, pool_.get()); + + // Calculate the number of values for each partition + std::vector partitionRowCounts(numPartitions, 0); + for (auto partition : partitions) { + partitionRowCounts[partition]++; + } + + // Create the partitioned vector using the actual implementation + auto partitionedVector = PartitionedVector::create( + vector, + partitions, + numPartitions, + // partitionOffsets_, + ctx_, + pool_.get()); + VELOX_CHECK_NOT_NULL(partitionedVector); + + // Extract each partition and compare with expected results + std::vector partitionedVectors; + for (uint32_t i = 0; i < numPartitions; ++i) { + auto partition = partitionedVector->partitionAt(i); + partitionedVectors.push_back(partition); + } + + for (uint32_t i = 0; i < numPartitions; ++i) { + test::assertEqualVectors( + expectedVectors[i], canonicalize(partitionedVectors[i])); + } + } + + void testVectorPartitioning(VectorPtr vector) { + auto numRows = vector->size(); + std::vector partitions(numRows); + + // Test with single partition + std::fill(partitions.begin(), partitions.end(), 0); + auto vectorCopy = BaseVector::copy(*vector, pool_.get()); + testPartitionedVector(vectorCopy, partitions, 1); + + // Test with two partitions + if (vector->size() >= 3) { + for (uint32_t i = 0; i < partitions.size(); ++i) { + partitions[i] = i % 2; + } + vectorCopy = BaseVector::copy(*vector, pool_.get()); + testPartitionedVector(vectorCopy, partitions, 2); + } + + // Test with three partitions + for (uint32_t i = 0; i < partitions.size(); ++i) { + partitions[i] = i % 3; + } + vectorCopy = BaseVector::copy(*vector, pool_.get()); + testPartitionedVector(vectorCopy, partitions, 3); + + if (vector->size() > 4) { + // Test with four partitions where the first partition is empty + for (uint32_t i = 0; i < partitions.size(); ++i) { + partitions[i] = i % 3 + 1; + } + vectorCopy = BaseVector::copy(*vector, pool_.get()); + testPartitionedVector(vectorCopy, partitions, 4); + + // Test with four partitions where the last partition is empty + for (uint32_t i = 0; i < partitions.size(); ++i) { + partitions[i] = i % 3; + } + vectorCopy = BaseVector::copy(*vector, pool_.get()); + testPartitionedVector(vectorCopy, partitions, 4); + } + + // Test with one value per partition + if (vector->size() > 0) { + std::iota(partitions.begin(), partitions.end(), 0); + vectorCopy = BaseVector::copy(*vector, pool_.get()); + testPartitionedVector(vectorCopy, partitions, numRows); + } + + // Test with random partitions (number of partitions <= number of values) + std::uniform_int_distribution<> dis(0, numRows - 1); + uint32_t maxPartition = 0; + for (uint32_t i = 0; i < numRows; ++i) { + partitions[i] = dis(gen_); + maxPartition = std::max(maxPartition, partitions[i]); + } + vectorCopy = BaseVector::copy(*vector, pool_.get()); + testPartitionedVector(vectorCopy, partitions, maxPartition + 1); + } +}; + +TEST_P(PartitioningVectorTest, testFlatVector) { + // Number of values in the vector to be partitioned. This is passed as a test + // parameter and is used to test different vector sizes, including edge cases + // like 0 and 1. + const int numValues = GetParam(); + + // Random values, no nulls + testVectorPartitioning( + makeFlatVector(numValues, [](auto row) { return row; })); + + // Random values, with half number of nulls + testVectorPartitioning( + makeFlatVector( + numValues, [](auto row) { return row; }, nullEvery(2, 1))); + + // All nulls + testVectorPartitioning(makeAllNullFlatVector(numValues)); +} + +TEST_P(PartitioningVectorTest, testFlatBoolVector) { + const int numValues = GetParam(); + + // Random values, no nulls + testVectorPartitioning( + makeFlatVector(numValues, [](auto row) { return row % 2 == 0; })); + + // Random values, with half number of nulls + testVectorPartitioning( + makeFlatVector( + numValues, [](auto row) { return row % 2 == 0; }, nullEvery(2, 1))); + + // All nulls + testVectorPartitioning(makeAllNullFlatVector(numValues)); +} + +TEST_P(PartitioningVectorTest, testRowVector) { + const int numValues = GetParam(); + + // Two flat columns, no nulls at any level. + testVectorPartitioning(makeRowVector({ + makeFlatVector(numValues, [](auto row) { return row; }), + makeFlatVector(numValues, [](auto row) { return row * 10; }), + })); + + // Two flat columns with nullable children. + testVectorPartitioning(makeRowVector({ + makeFlatVector( + numValues, [](auto row) { return row; }, nullEvery(2)), + makeFlatVector( + numValues, [](auto row) { return row * 10; }, nullEvery(3)), + })); + + // Row-level nulls with no child nulls. + testVectorPartitioning(makeRowVector( + {makeFlatVector(numValues, [](auto row) { return row; })}, + nullEvery(2))); + + // Row-level nulls combined with nullable children. + testVectorPartitioning(makeRowVector( + {makeFlatVector( + numValues, [](auto row) { return row; }, nullEvery(3))}, + nullEvery(2))); + + // All rows null. + testVectorPartitioning(makeRowVector( + {makeFlatVector(numValues, [](auto row) { return row; })}, + [](auto /*row*/) { return true; })); + + // Nested RowVector. + testVectorPartitioning(makeRowVector({ + makeFlatVector(numValues, [](auto row) { return row; }), + makeRowVector({ + makeFlatVector(numValues, [](auto row) { return row; }), + }), + })); +} + +TEST_P(PartitioningVectorTest, testConstantVector) { + const int numValues = GetParam(); + + testVectorPartitioning(makeConstant(7, numValues)); + testVectorPartitioning(makeConstant(std::nullopt, numValues)); + testVectorPartitioning(makeConstantRow( + ROW({"c0", "c1"}, {INTEGER(), VARCHAR()}), + variant::row({variant(11), variant("constant")}), + numValues)); +} + +// Partitioning a null-free vector must not allocate a null buffer. +TEST_P(PartitioningVectorTest, noNullBufferAllocatedForNullFreeFlat) { + const int numValues = GetParam(); + if (numValues == 0) { + return; + } + + auto flat = makeFlatVector(numValues, [](auto row) { return row; }); + ASSERT_FALSE(flat->mayHaveNulls()); + + std::vector partitions(numValues); + for (int i = 0; i < numValues; ++i) { + partitions[i] = i % 2; + } + + auto pv = PartitionedVector::create(flat, partitions, 2, ctx_, pool_.get()); + EXPECT_FALSE(pv->baseVector()->mayHaveNulls()) + << "partition() must not allocate a null buffer for a null-free FlatVector"; +} + +// Partitioning a null-free RowVector must not allocate null buffers on the +// row vector or any of its children. +TEST_P(PartitioningVectorTest, noNullBufferAllocatedForNullFreeRow) { + const int numValues = GetParam(); + if (numValues == 0) { + return; + } + + auto row = makeRowVector({ + makeFlatVector(numValues, [](auto row) { return row; }), + makeFlatVector(numValues, [](auto row) { return row * 10; }), + }); + ASSERT_FALSE(row->mayHaveNulls()); + ASSERT_FALSE(row->childAt(0)->mayHaveNulls()); + ASSERT_FALSE(row->childAt(1)->mayHaveNulls()); + + std::vector partitions(numValues); + for (int i = 0; i < numValues; ++i) { + partitions[i] = i % 2; + } + + auto pv = PartitionedVector::create(row, partitions, 2, ctx_, pool_.get()); + auto* base = pv->baseVector()->as(); + EXPECT_FALSE(base->mayHaveNulls()) + << "partition() must not allocate a null buffer for a null-free RowVector"; + EXPECT_FALSE(base->childAt(0)->mayHaveNulls()) + << "partition() must not allocate a null buffer for null-free child 0"; + EXPECT_FALSE(base->childAt(1)->mayHaveNulls()) + << "partition() must not allocate a null buffer for null-free child 1"; +} + +// numNullsAt() tests +// --------------------------------------------------------------------------- + +// A null-free flat vector must report zero nulls for every partition. +TEST_P(PartitioningVectorTest, numNullsAtFlatNoNulls) { + const int numValues = GetParam(); + auto flat = makeFlatVector(numValues, [](auto row) { return row; }); + + std::vector partitions(numValues); + for (int i = 0; i < numValues; ++i) { + partitions[i] = i % 3; + } + auto pv = PartitionedVector::create(flat, partitions, 3, ctx_, pool_.get()); + for (uint32_t p = 0; p < 3; ++p) { + EXPECT_EQ(pv->numNullsAt(p), 0) << "partition " << p; + } +} + +// A flat vector with every other row null must report the exact per-partition +// null count. The sum across all partitions must equal the total null count. +TEST_P(PartitioningVectorTest, numNullsAtFlatSomeNulls) { + const int numValues = GetParam(); + auto flat = makeFlatVector( + numValues, [](auto row) { return row; }, nullEvery(2)); + + std::vector partitions(numValues); + for (int i = 0; i < numValues; ++i) { + partitions[i] = i % 3; + } + auto pv = PartitionedVector::create(flat, partitions, 3, ctx_, pool_.get()); + + // Per-partition counts must agree with manual bit-scan of the base vector. + const auto* rawNulls = pv->baseVector()->rawNulls(); + const auto* rawOffsets = pv->rawPartitionOffsets(); + for (uint32_t p = 0; p < 3; ++p) { + const vector_size_t begin = p == 0 ? 0 : rawOffsets[p - 1]; + const vector_size_t end = rawOffsets[p]; + const vector_size_t expected = rawNulls + ? BaseVector::countNulls(pv->baseVector()->nulls(), begin, end) + : 0; + EXPECT_EQ(pv->numNullsAt(p), expected) << "partition " << p; + } + + // Sum across partitions must equal the total null count in the source vector. + const vector_size_t total = + pv->numNullsAt(0) + pv->numNullsAt(1) + pv->numNullsAt(2); + EXPECT_EQ(total, BaseVector::countNulls(flat->nulls(), 0, numValues)); +} + +// An all-null flat vector must report numNullsAt(p) == rows in that partition. +TEST_P(PartitioningVectorTest, numNullsAtFlatAllNulls) { + const int numValues = GetParam(); + auto flat = makeAllNullFlatVector(numValues); + + std::vector partitions(numValues); + for (int i = 0; i < numValues; ++i) { + partitions[i] = i % 3; + } + auto pv = PartitionedVector::create(flat, partitions, 3, ctx_, pool_.get()); + + const auto* rawOffsets = pv->rawPartitionOffsets(); + for (uint32_t p = 0; p < 3; ++p) { + const vector_size_t begin = p == 0 ? 0 : rawOffsets[p - 1]; + const vector_size_t numRowsInPartition = rawOffsets[p] - begin; + EXPECT_EQ(pv->numNullsAt(p), numRowsInPartition) << "partition " << p; + } +} + +// A row vector with no row-level nulls must report zero per-partition nulls at +// the row level, even when child columns have nulls. +TEST_P(PartitioningVectorTest, numNullsAtRowNoRowLevelNulls) { + const int numValues = GetParam(); + auto row = makeRowVector({ + makeFlatVector( + numValues, [](auto row) { return row; }, nullEvery(2)), + }); + ASSERT_FALSE(row->mayHaveNulls()); + + std::vector partitions(numValues); + for (int i = 0; i < numValues; ++i) { + partitions[i] = i % 3; + } + auto pv = PartitionedVector::create(row, partitions, 3, ctx_, pool_.get()); + for (uint32_t p = 0; p < 3; ++p) { + EXPECT_EQ(pv->numNullsAt(p), 0) + << "Row-level numNullsAt() must not count child nulls, partition " << p; + } +} + +// A row vector with row-level nulls must report per-partition counts that match +// a manual bit-scan. Child null counts must be counted independently. +TEST_P(PartitioningVectorTest, numNullsAtRowRowLevelNulls) { + const int numValues = GetParam(); + auto row = makeRowVector( + {makeFlatVector( + numValues, [](auto row) { return row; }, nullEvery(3))}, + nullEvery(2)); + + std::vector partitions(numValues); + for (int i = 0; i < numValues; ++i) { + partitions[i] = i % 3; + } + auto pv = PartitionedVector::create(row, partitions, 3, ctx_, pool_.get()); + + const auto* rawOffsets = pv->rawPartitionOffsets(); + for (uint32_t p = 0; p < 3; ++p) { + const vector_size_t begin = p == 0 ? 0 : rawOffsets[p - 1]; + const vector_size_t end = rawOffsets[p]; + const vector_size_t expected = + BaseVector::countNulls(pv->baseVector()->nulls(), begin, end); + EXPECT_EQ(pv->numNullsAt(p), expected) + << "Row-level null count mismatch, partition " << p; + } + + // Child null counts must be tracked independently of row-level nulls. + auto* prv = dynamic_cast(pv.get()); + ASSERT_NE(prv, nullptr); + auto child = prv->childAt(0); + const auto* childOffsets = child->rawPartitionOffsets(); + for (uint32_t p = 0; p < 3; ++p) { + const vector_size_t begin = p == 0 ? 0 : childOffsets[p - 1]; + const vector_size_t end = childOffsets[p]; + const vector_size_t expected = + BaseVector::countNulls(child->baseVector()->nulls(), begin, end); + EXPECT_EQ(child->numNullsAt(p), expected) + << "Child null count mismatch, partition " << p; + } +} + +// Test with different vector sizes, including edge cases like 0 and 1. +INSTANTIATE_TEST_SUITE_P( + FlatVectorSizes, + PartitioningVectorTest, + ::testing::Values(0, 1, 10, 10000)); + +} // namespace facebook::velox::test diff --git a/velox/vector/tests/utils/CMakeLists.txt b/velox/vector/tests/utils/CMakeLists.txt index 4d61ce356e6..a282faa960a 100644 --- a/velox/vector/tests/utils/CMakeLists.txt +++ b/velox/vector/tests/utils/CMakeLists.txt @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -add_library(velox_vector_test_lib VectorMaker.cpp VectorTestBase.cpp) +add_library(velox_vector_test_lib PartitionedVectorTestBase.cpp VectorMaker.cpp VectorTestBase.cpp) target_link_libraries( velox_vector_test_lib diff --git a/velox/vector/tests/utils/PartitionedVectorTestBase.cpp b/velox/vector/tests/utils/PartitionedVectorTestBase.cpp new file mode 100644 index 00000000000..6c939dfb569 --- /dev/null +++ b/velox/vector/tests/utils/PartitionedVectorTestBase.cpp @@ -0,0 +1,126 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/vector/tests/utils/PartitionedVectorTestBase.h" + +namespace facebook::velox::test { + +VectorPtr PartitionedVectorTestBase::canonicalize(VectorPtr vector) { + auto numRows = vector->size(); + + auto indices = makeIndices(numRows, [&](auto row) { return row; }); + vector_size_t* indicesRange = indices->asMutable(); + + // Sort the indices based on the vector values + std::stable_sort( + indicesRange, + indicesRange + numRows, + [&](vector_size_t left, vector_size_t right) { + return vector->compare(vector.get(), left, right) < 0; + }); + + auto sortedVector = wrapInDictionary(indices, numRows, vector); + return sortedVector; +} + +std::vector PartitionedVectorTestBase::partitionVectorByWrapping( + VectorPtr vector, + const std::vector& partitions, + uint32_t numPartitions) { + auto numRows = vector->size(); + + // Count the number of rows in each partition + std::vector partitionRowCounts(numPartitions, 0); + for (int i = 0; i < numRows; i++) { + partitionRowCounts[partitions[i]]++; + } + + std::vector partitionedVectors(numPartitions, nullptr); + + for (int p = 0; p < numPartitions; p++) { + auto numRowsInPartition = partitionRowCounts[p]; + + if (numRowsInPartition == 0) { + partitionedVectors[p] = + BaseVector::create(vector->type(), 0, pool_.get()); + continue; + } + + // Create an indices buffer for each partition, and fill it with the row + // indices for that partition. + std::vector rowIdsInPartition(numRowsInPartition); + vector_size_t offset = 0; + for (vector_size_t i = 0; i < numRows; ++i) { + if (partitions[i] == p) { + VELOX_DCHECK_LT(offset, numRowsInPartition); + rowIdsInPartition[offset++] = i; + } + } + VELOX_CHECK_EQ(offset, numRowsInPartition); + auto indices = makeIndices(partitionRowCounts[p], [&](auto row) { + return rowIdsInPartition[row]; + }); + + // Simulate partitioning by building the DictionaryVector with the + // partitioned indices + // Copy firsts because wrapInDictionary would take the ownership of the + // vector + VectorPtr vectorCopy = BaseVector::copy(*vector, pool_.get()); + auto dictionaryVector = BaseVector::wrapInDictionary( + nullptr, indices, numRowsInPartition, vectorCopy); + partitionedVectors[p] = canonicalize(dictionaryVector); + } + return partitionedVectors; +} + +std::vector PartitionedVectorTestBase::partitionRowVectors( + const std::vector& rowVectors, + int32_t numPartitions, + core::PartitionFunction* partitionFunction) { + // RowVectorPtr mergedRowVector = mergeRowVectors(rowVectors); + VectorPtr mergedRowVector = + mergeVectors((const std::vector&)rowVectors); + auto totalNumRows = mergedRowVector->size(); + + std::vector partitions(totalNumRows, 0); + if (numPartitions > 1) { + auto rowType = asRowType(mergedRowVector->type()); + // auto partitionFunction = createPartitionFunction(rowType, {0}); + partitionFunction->partition(*mergedRowVector->as(), partitions); + } + + std::vector partitionedVectors = + partitionVectorByWrapping(mergedRowVector, partitions, numPartitions); + + for (auto& vector : partitionedVectors) { + vector = canonicalize(vector); + } + return partitionedVectors; +} + +VectorPtr PartitionedVectorTestBase::mergeVectors( + const std::vector& vectors) { + // We have to count the total number of rows first in order to allocate the + // mergedRowVector. + auto mergedVector = BaseVector::copy(*vectors[0]); + for (auto i = 1; i < vectors.size(); ++i) { + mergedVector->append(vectors[i].get()); + } + + return mergedVector; +} + +} // namespace facebook::velox::test diff --git a/velox/vector/tests/utils/PartitionedVectorTestBase.h b/velox/vector/tests/utils/PartitionedVectorTestBase.h new file mode 100644 index 00000000000..b2c50761edc --- /dev/null +++ b/velox/vector/tests/utils/PartitionedVectorTestBase.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/core/PlanNode.h" +#include "velox/vector/PartitionedVector.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +namespace facebook::velox::test { + +class PartitionedVectorTestBase : public VectorTestBase { + protected: + std::vector partitionVectorByWrapping( + VectorPtr vector, + const std::vector& partitions, + uint32_t numPartitions); + + std::vector partitionRowVectors( + const std::vector& rowVectors, + int32_t numPartitions, + core::PartitionFunction* partitionFunction); + + VectorPtr canonicalize(VectorPtr vector); + + VectorPtr mergeVectors(const std::vector& vectors); +}; + +} // namespace facebook::velox::test