Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
76dc41a
feat: Introducing PartitionedVector
yingsu00 Jan 13, 2026
3853bf6
feat: Add PartitionedRowVector implementation
yingsu00 Mar 5, 2026
ff2e34b
refactor: Move initializeCursorPartitionOffsets into partitionFixedWi…
xin-zhang2 Mar 10, 2026
875c92c
fix: Add bool specialization for partitionFixedWidthValues
xin-zhang2 Mar 10, 2026
281a365
fix: Avoid allocating null buffer when partitioning null-free vectors
yingsu00 Mar 12, 2026
6519a8f
feat: Add ParitionedConstantVector implementation
xin-zhang2 Mar 13, 2026
d8f34b4
Add PartitionedVector benchmark
xin-zhang2 Mar 4, 2026
9eafc9d
feat(PartitionedOutput): Add numNullsPerPartition_ to PartitionedVector
yingsu00 Mar 20, 2026
6f09ea9
feat(PartitionedOutput): Add PrestoIterativePartitioningSerializer
yingsu00 Mar 23, 2026
c114147
feat(PartitionedOutput): Add OptimizedPartitionedOutput operator
yingsu00 Apr 1, 2026
211901c
feat(PartitionedOutput): Add normal vs optimized comparison in Exchan…
yingsu00 Apr 10, 2026
e1e10b3
refactor:(PartitionedOutput): Separate local partition exchange bench…
yingsu00 Apr 11, 2026
627bf5d
feat(PartitionedOutput): Add constant support in PrestoIterativeParti…
xin-zhang2 Apr 1, 2026
12f84ef
fix(PartitionedOutput): wire optimized partitioned output to OutputBu…
yingsu00 Apr 21, 2026
ee8a1aa
refactor(PartitionedOutput): rework ExchangeBenchmark inputs and repo…
yingsu00 Apr 12, 2026
b38390f
perf: Introduce OptimizedVectorHasher
yingsu00 May 1, 2026
fceb8bc
feat(PartitionedOutput): fix test failures caused by listenerFactory
xin-zhang2 May 5, 2026
1760e33
feat(PartitionedOutput): Improve PartitionedVectorBenchmark
yingsu00 Apr 22, 2026
bf93e39
fix: Pass Velox CMake build type to Folly
yingsu00 May 9, 2026
aabd0b1
misc: Clean up OptimizedVectorHasherBenchmark.cpp
yingsu00 May 9, 2026
86c1e73
feat(PartitionedOutput): Add BufferState to track bytesBuffered
xin-zhang2 May 5, 2026
38254ae
feat(PartitionedOutput): Add outputChannels support
xin-zhang2 Apr 28, 2026
ecd87e4
perf: Add AVX512 support
yingsu00 May 9, 2026
28588ad
perf: Introduce OptimizedHashPartitionFunction
yingsu00 May 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion scripts/setup-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ function install_fmt {

function install_folly {
wget_and_untar https://github.com/facebook/folly/archive/refs/tags/"${FB_OS_VERSION}".tar.gz folly
local FOLLY_FLAGS=(-DBUILD_SHARED_LIBS="$VELOX_BUILD_SHARED" -DBUILD_TESTS=OFF -DFOLLY_HAVE_INT128_T=ON)
local FOLLY_FLAGS=(
-DBUILD_SHARED_LIBS="$VELOX_BUILD_SHARED"
-DBUILD_TESTS=OFF
-DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}"
-DFOLLY_HAVE_INT128_T=ON
)
# When folly is static, use static gflags to avoid dual gflags flag
# registration when .so plugins are dlopen'd (both the binary and plugin
# would register the same flags in a shared gflags registry).
Expand Down
17 changes: 13 additions & 4 deletions scripts/setup-helper-functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ function github_checkout {
# The values that CPU_ARCH can take are as follows:
# arm64 : Target Apple silicon.
# aarch64: Target general 64 bit arm cpus.
# avx: Target Intel CPUs with AVX.
# avx512: Target Intel CPUs with AVX-512F.
# avx: Target Intel CPUs with AVX2.
# sse: Target Intel CPUs with sse.
# Echo's the appropriate compiler flags which can be captured as so
# CXX_FLAGS=$(get_cxx_flags) or
Expand All @@ -102,7 +103,9 @@ function get_cxx_flags {
else # x86_64
local CPU_CAPABILITIES
CPU_CAPABILITIES=$(sysctl -a | grep machdep.cpu.features | awk '{print tolower($0)}')
if [[ $CPU_CAPABILITIES =~ "avx" ]]; then
if [[ $CPU_CAPABILITIES =~ "avx512f" ]]; then
CPU_ARCH="avx512"
elif [[ $CPU_CAPABILITIES =~ "avx" ]]; then
CPU_ARCH="avx"
else
CPU_ARCH="sse"
Expand All @@ -114,7 +117,9 @@ function get_cxx_flags {
else # x86_64
local CPU_CAPABILITIES
CPU_CAPABILITIES=$(cat /proc/cpuinfo | grep flags | head -n 1 | awk '{print tolower($0)}')
if [[ $CPU_CAPABILITIES =~ "avx" ]]; then
if [[ $CPU_CAPABILITIES =~ "avx512f" ]]; then
CPU_ARCH="avx512"
elif [[ $CPU_CAPABILITIES =~ "avx" ]]; then
CPU_ARCH="avx"
elif [[ $CPU_CAPABILITIES =~ "sse" ]]; then
CPU_ARCH="sse"
Expand All @@ -131,8 +136,12 @@ function get_cxx_flags {
echo -n "-mcpu=apple-m1+crc"
;;

"avx512")
echo -n "-mavx512f -mavx2 -mfma -mavx -mf16c -mlzcnt -mbmi2"
;;

"avx")
echo -n "-mavx2 -mfma -mavx -mf16c -mlzcnt -mbmi2"
echo -n "-mavx2 -mfma -mavx -mf16c -mlzcnt -mbmi2"
;;

"sse")
Expand Down
11 changes: 11 additions & 0 deletions velox/common/process/ProcessBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ DECLARE_bool(avx2); // Enables use of AVX2 when available NOLINT

DECLARE_bool(bmi2); // Enables use of BMI2 when available NOLINT

DECLARE_bool(avx512f);

namespace facebook {
namespace velox {
namespace process {
Expand Down Expand Up @@ -106,6 +108,7 @@ uint64_t threadCpuNanos() {
namespace {
bool bmi2CpuFlag = folly::CpuId().bmi2();
bool avx2CpuFlag = folly::CpuId().avx2();
bool avx512fCpuFlag = folly::CpuId().avx512f();
} // namespace

bool hasAvx2() {
Expand All @@ -124,6 +127,14 @@ bool hasBmi2() {
#endif
}

bool hasAvx512f() {
#ifdef __AVX512F__
return avx512fCpuFlag && FLAGS_avx512f;
#else
return false;
#endif
}

} // namespace process
} // namespace velox
} // namespace facebook
4 changes: 4 additions & 0 deletions velox/common/process/ProcessBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ uint64_t threadCpuNanos();
/// by flag.
bool hasAvx2();

/// True if the machine has Intel AVX512F instructions and these are not
/// disabled by flag.
bool hasAvx512f();

/// True if the machine has Intel BMI2 instructions and these are not disabled
/// by flag.
bool hasBmi2();
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ void HiveConnector::registerSerDe() {

std::unique_ptr<core::PartitionFunction> HivePartitionFunctionSpec::create(
int numPartitions,
bool localExchange) const {
bool localExchange,
bool /*useOptimizedPartitionFunction*/) const {
std::vector<int> bucketToPartitions;
if (bucketToPartition_.empty()) {
// NOTE: if hive partition function spec doesn't specify bucket to partition
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ class HivePartitionFunctionSpec : public core::PartitionFunctionSpec {

std::unique_ptr<core::PartitionFunction> create(
int numPartitions,
bool localExchange) const override;
bool localExchange,
bool useOptimizedPartitionFunction = false) const override;

std::string toString() const override;

Expand Down
9 changes: 7 additions & 2 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -2500,9 +2500,13 @@ class PartitionFunctionSpec : public ISerializable {
public:
/// If 'localExchange' is true, the partition function is used for local
/// exchange within a velox task.
/// TODO: useOptimizedPartitionFunction = true is only supported in
/// HashPartitionFunction now. Will extend the optimization to other
/// PartitionFunctions soon.
virtual std::unique_ptr<PartitionFunction> create(
int numPartitions,
bool localExchange = false) const = 0;
bool localExchange = false,
bool useOptimizedPartitionFunction = false) const = 0;

virtual ~PartitionFunctionSpec() = default;

Expand All @@ -2515,7 +2519,8 @@ class GatherPartitionFunctionSpec : public PartitionFunctionSpec {
public:
std::unique_ptr<PartitionFunction> create(
int /*numPartitions*/,
bool /*localExchange*/) const override {
bool /*localExchange*/,
bool /*useOptimizedPartitionFunction*/ = false) const override {
VELOX_UNREACHABLE();
}

Expand Down
1 change: 1 addition & 0 deletions velox/core/QueryConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ const std::vector<config::ConfigProperty>& QueryConfig::registeredProperties() {

// Partitioned output.
VELOX_REGISTER_QUERY_CONFIG(kPartitionedOutputEagerFlush);
VELOX_REGISTER_QUERY_CONFIG(kOptimizedHashPartitionFunctionEnabled);
VELOX_REGISTER_QUERY_CONFIG(kMaxPartitionedOutputBufferSize);
VELOX_REGISTER_QUERY_CONFIG(kMaxOutputBufferSize);

Expand Down
18 changes: 18 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,16 @@ class QueryConfig {
false,
"Flush PartitionedOutput rows eagerly without buffering.")

/// If true, use OptimizedHashPartitionFunction in place of
/// HashPartitionFunction.
VELOX_QUERY_CONFIG(
kOptimizedHashPartitionFunctionEnabled,
optimizedHashPartitionFunctionEnabled,
"optimized_hash_partition_function_enabled",
bool,
false,
"Use OptimizedHashPartitionFunction instead of HashPartitionFunction.")

/// The maximum number of bytes to buffer in PartitionedOutput operator to
/// avoid creating tiny SerializedPages.
VELOX_QUERY_CONFIG(
Expand Down Expand Up @@ -1469,6 +1479,14 @@ class QueryConfig {
1000,
"Batch size threshold for zero-copy in MarkSorted operator.")

VELOX_QUERY_CONFIG(
kOptimizedPartitionedOutputEnabled,
optimizedPartitionedOutputEnabled,
"optimized_repartitioning",
bool,
false,
"Enable OptimizedPartitionedOutput operator.");

// --- Hand-written accessors for properties that need custom logic ---

// Generated by VELOX_QUERY_CONFIG for simple properties above.
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ velox_add_library(
OperatorTraceScan.cpp
OperatorTraceWriter.cpp
OperatorUtils.cpp
OptimizedHashPartitionFunction.cpp
OptimizedPartitionedOutput.cpp
OptimizedVectorHasher.cpp
OrderBy.cpp
OutputBuffer.cpp
OutputBufferManager.cpp
Expand Down Expand Up @@ -177,6 +180,7 @@ velox_add_library(
OperatorTraceWriter.h
OperatorType.h
OperatorUtils.h
OptimizedVectorHasher.h
OrderBy.h
OutputBuffer.h
OutputBufferManager.h
Expand Down
47 changes: 42 additions & 5 deletions velox/exec/HashPartitionFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <velox/exec/HashPartitionFunction.h>
#include <velox/exec/VectorHasher.h>
#include "velox/exec/HashPartitionFunction.h"

#include "velox/exec/OptimizedHashPartitionFunction.h"
#include "velox/exec/VectorHasher.h"

#define XXH_INLINE_ALL
#include <xxhash.h> // @manual=third-party//xxHash:xxhash
Expand Down Expand Up @@ -123,9 +125,15 @@ std::optional<uint32_t> HashPartitionFunction::partition(

std::unique_ptr<core::PartitionFunction> HashPartitionFunctionSpec::create(
int numPartitions,
bool localExchange) const {
return std::make_unique<exec::HashPartitionFunction>(
localExchange, numPartitions, inputType_, keyChannels_, constValues_);
bool localExchange,
bool useOptimizedPartitionFunction) const {
return createHashPartitionFunction(
localExchange,
numPartitions,
inputType_,
keyChannels_,
constValues_,
useOptimizedPartitionFunction);
}

std::string HashPartitionFunctionSpec::toString() const {
Expand Down Expand Up @@ -180,4 +188,33 @@ core::PartitionFunctionSpecPtr HashPartitionFunctionSpec::deserialize(
return std::make_shared<HashPartitionFunctionSpec>(
ISerializable::deserialize<RowType>(obj["inputType"]), keys, constValues);
}

std::unique_ptr<HashPartitionFunctionBase> createHashPartitionFunction(
bool localExchange,
int numPartitions,
const RowTypePtr& inputType,
const std::vector<column_index_t>& keyChannels,
const std::vector<VectorPtr>& constValues,
bool useOptimizedPartitionFunction) {
if (useOptimizedPartitionFunction) {
return std::make_unique<OptimizedHashPartitionFunction>(
localExchange, numPartitions, inputType, keyChannels, constValues);
}
return std::make_unique<HashPartitionFunction>(
localExchange, numPartitions, inputType, keyChannels, constValues);
}

std::unique_ptr<HashPartitionFunctionBase> createHashPartitionFunction(
const HashBitRange& hashBitRange,
const RowTypePtr& inputType,
const std::vector<column_index_t>& keyChannels,
const std::vector<VectorPtr>& constValues,
bool useOptimizedPartitionFunction) {
if (useOptimizedPartitionFunction) {
return std::make_unique<OptimizedHashPartitionFunction>(
hashBitRange, inputType, keyChannels, constValues);
}
return std::make_unique<HashPartitionFunction>(
hashBitRange, inputType, keyChannels, constValues);
}
} // namespace facebook::velox::exec
38 changes: 33 additions & 5 deletions velox/exec/HashPartitionFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,28 @@
*/
#pragma once

#include <velox/exec/HashBitRange.h>
#include <velox/exec/VectorHasher.h>
#include "velox/core/PlanNode.h"
#include "velox/exec/HashBitRange.h"
#include "velox/exec/VectorHasher.h"

namespace facebook::velox::exec {

class HashPartitionFunctionBase : public core::PartitionFunction {
public:
~HashPartitionFunctionBase() override = default;

virtual int numPartitions() const = 0;
};

/// Calculates partition number for each row of the specified vector using a
/// hash function. The constructor with hashBitRange parameter requires both
/// hashBitRange and keyChannels to be non-empty. The constructor with
/// numPartitions allows the keyChannels argument to be empty. If keyChannels is
/// empty, then the resulting partition number of partition() will always be
/// zero.
class HashPartitionFunction : public core::PartitionFunction {
/// Extends PartitionFunction with access to the configured number of
/// partitions.
class HashPartitionFunction : public HashPartitionFunctionBase {
public:
HashPartitionFunction(
bool localExchange,
Expand All @@ -48,7 +57,7 @@ class HashPartitionFunction : public core::PartitionFunction {
const RowVector& input,
std::vector<uint32_t>& partitions) override;

int numPartitions() const {
int numPartitions() const override {
return numPartitions_;
}

Expand Down Expand Up @@ -85,7 +94,8 @@ class HashPartitionFunctionSpec : public core::PartitionFunctionSpec {

std::unique_ptr<core::PartitionFunction> create(
int numPartitions,
bool localExchange) const override;
bool localExchange,
bool useOptimizedPartitionFunction = false) const override;

std::string toString() const override;

Expand All @@ -100,4 +110,22 @@ class HashPartitionFunctionSpec : public core::PartitionFunctionSpec {
const std::vector<column_index_t> keyChannels_;
const std::vector<VectorPtr> constValues_;
};

/// Creates either HashPartitionFunction or OptimizedHashPartitionFunction
/// based on 'useOptimizedPartitionFunction'.
std::unique_ptr<HashPartitionFunctionBase> createHashPartitionFunction(
bool localExchange,
int numPartitions,
const RowTypePtr& inputType,
const std::vector<column_index_t>& keyChannels,
const std::vector<VectorPtr>& constValues = {},
bool useOptimizedPartitionFunction = false);

std::unique_ptr<HashPartitionFunctionBase> createHashPartitionFunction(
const HashBitRange& hashBitRange,
const RowTypePtr& inputType,
const std::vector<column_index_t>& keyChannels,
const std::vector<VectorPtr>& constValues = {},
bool useOptimizedPartitionFunction = false);

} // namespace facebook::velox::exec
11 changes: 7 additions & 4 deletions velox/exec/LocalPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,13 @@ LocalPartition::LocalPartition(
ctx->task->getLocalExchangeQueues(ctx->splitGroupId, planNode->id())},
numPartitions_{queues_.size()},
partitionFunction_(
numPartitions_ == 1 ? nullptr
: planNode->partitionFunctionSpec().create(
numPartitions_,
/*localExchange=*/true)),
numPartitions_ == 1
? nullptr
: planNode->partitionFunctionSpec().create(
numPartitions_,
/*localExchange=*/true,
ctx->queryConfig()
.optimizedHashPartitionFunctionEnabled())),
singlePartitionBufferSize_{
(numPartitions_ <
ctx->queryConfig()
Expand Down
13 changes: 10 additions & 3 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "velox/exec/NestedLoopJoinBuild.h"
#include "velox/exec/NestedLoopJoinProbe.h"
#include "velox/exec/OperatorTraceScan.h"
#include "velox/exec/OptimizedPartitionedOutput.h"
#include "velox/exec/OrderBy.h"
#include "velox/exec/ParallelProject.h"
#include "velox/exec/PartitionedOutput.h"
Expand Down Expand Up @@ -553,9 +554,15 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
auto partitionedOutputNode =
std::dynamic_pointer_cast<const core::PartitionedOutputNode>(
planNode)) {
operators.push_back(
std::make_unique<PartitionedOutput>(
id, ctx.get(), partitionedOutputNode, eagerFlush(*planNode)));
if (ctx->queryConfig().optimizedPartitionedOutputEnabled()) {
operators.push_back(
std::make_unique<OptimizedPartitionedOutput>(
id, ctx.get(), partitionedOutputNode));
} else {
operators.push_back(
std::make_unique<PartitionedOutput>(
id, ctx.get(), partitionedOutputNode, eagerFlush(*planNode)));
}
} else if (
auto joinNode =
std::dynamic_pointer_cast<const core::HashJoinNode>(planNode)) {
Expand Down
Loading
Loading