forked from facebookincubator/velox
-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathHashPartitionFunction.h
More file actions
131 lines (110 loc) · 4.45 KB
/
HashPartitionFunction.h
File metadata and controls
131 lines (110 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "velox/core/PlanNode.h"
#include "velox/exec/HashBitRange.h"
#include "velox/exec/VectorHasher.h"
namespace facebook::velox::exec {
class HashPartitionFunctionBase : public core::PartitionFunction {
public:
~HashPartitionFunctionBase() override = default;
virtual int numPartitions() const = 0;
};
/// Calculates partition number for each row of the specified vector using a
/// hash function. The constructor with hashBitRange parameter requires both
/// hashBitRange and keyChannels to be non-empty. The constructor with
/// numPartitions allows the keyChannels argument to be empty. If keyChannels is
/// empty, then the resulting partition number of partition() will always be
/// zero.
/// Extends PartitionFunction with access to the configured number of
/// partitions.
class HashPartitionFunction : public HashPartitionFunctionBase {
public:
HashPartitionFunction(
bool localExchange,
int numPartitions,
const RowTypePtr& inputType,
const std::vector<column_index_t>& keyChannels,
const std::vector<VectorPtr>& constValues = {});
HashPartitionFunction(
const HashBitRange& hashBitRange,
const RowTypePtr& inputType,
const std::vector<column_index_t>& keyChannels,
const std::vector<VectorPtr>& constValues = {});
~HashPartitionFunction() override = default;
std::optional<uint32_t> partition(
const RowVector& input,
std::vector<uint32_t>& partitions) override;
int numPartitions() const override {
return numPartitions_;
}
private:
void init(
const RowTypePtr& inputType,
const std::vector<column_index_t>& keyChannels,
const std::vector<VectorPtr>& constValues);
const bool localExchange_;
const int numPartitions_;
const std::optional<HashBitRange> hashBitRange_ = std::nullopt;
std::vector<std::unique_ptr<VectorHasher>> hashers_;
// Reusable memory.
SelectivityVector rows_;
raw_vector<uint64_t> hashes_;
};
/// Factory class to create HashPartitionFunction
/// 'keyChannels' stores the index of keys to partition on, if the key is a
/// constant, use index 'kConstantChannel' to indicate so and store the constant
/// value as a base vector in 'constValues'
/// The 'constValues' size is less than or equal to 'keyChannels' size
class HashPartitionFunctionSpec : public core::PartitionFunctionSpec {
public:
HashPartitionFunctionSpec(
RowTypePtr inputType,
std::vector<column_index_t> keyChannels,
std::vector<VectorPtr> constValues = {})
: inputType_{std::move(inputType)},
keyChannels_{std::move(keyChannels)},
constValues_{std::move(constValues)} {}
std::unique_ptr<core::PartitionFunction> create(
int numPartitions,
bool localExchange,
bool useOptimizedPartitionFunction = false) const override;
std::string toString() const override;
folly::dynamic serialize() const override;
static core::PartitionFunctionSpecPtr deserialize(
const folly::dynamic& obj,
void* context);
private:
const RowTypePtr inputType_;
const std::vector<column_index_t> keyChannels_;
const std::vector<VectorPtr> constValues_;
};
/// Creates either HashPartitionFunction or OptimizedHashPartitionFunction
/// based on 'useOptimizedPartitionFunction'.
std::unique_ptr<HashPartitionFunctionBase> createHashPartitionFunction(
bool localExchange,
int numPartitions,
const RowTypePtr& inputType,
const std::vector<column_index_t>& keyChannels,
const std::vector<VectorPtr>& constValues = {},
bool useOptimizedPartitionFunction = false);
std::unique_ptr<HashPartitionFunctionBase> createHashPartitionFunction(
const HashBitRange& hashBitRange,
const RowTypePtr& inputType,
const std::vector<column_index_t>& keyChannels,
const std::vector<VectorPtr>& constValues = {},
bool useOptimizedPartitionFunction = false);
} // namespace facebook::velox::exec