Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 77 additions & 3 deletions velox/exec/OptimizedPartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "velox/exec/OptimizedPartitionedOutput.h"

#include <unordered_map>

#include "velox/exec/HashPartitionFunction.h"
#include "velox/exec/SerializedPage.h"
#include "velox/exec/Task.h"
Expand Down Expand Up @@ -67,12 +69,15 @@ OptimizedPartitionedOutput::OptimizedPartitionedOutput(
operatorCtx_->driverCtx()->queryConfig().shuffleCompressionKind());
options.minCompressionRatio = 0.8;

initializeSerializerLayout();

serializer_ = std::make_unique<
serializer::presto::PrestoIterativePartitioningSerializer>(
inputType_,
outputType_,
numDestinations_,
options,
pool_,
serializerInputByOutput_,
[bufferManager =
bufferManager_]() -> std::unique_ptr<OutputStreamListener> {
auto lockedBufferManager = bufferManager.lock();
Expand All @@ -87,7 +92,10 @@ void OptimizedPartitionedOutput::addInput(RowVectorPtr input) {
!replicateNullsAndAny_,
"replicateNullsAndAny is not yet supported by OptimizedPartitionedOutput");

if (serializer_->estimateBytesAfterAppend(input) > maxOutputBufferBytes_) {
auto serializerInput = prepareSerializerInput(input);

if (serializer_->estimateBytesAfterAppend(serializerInput) >
maxOutputBufferBytes_) {
flush();
}

Expand All @@ -105,7 +113,7 @@ void OptimizedPartitionedOutput::addInput(RowVectorPtr input) {
}
}

serializer_->append(input, partitions_);
serializer_->append(serializerInput, partitions_);

auto lockedStats = stats_.wlock();
++numAppends_;
Expand Down Expand Up @@ -157,6 +165,72 @@ bool OptimizedPartitionedOutput::isFinished() {
return finished_;
}

void OptimizedPartitionedOutput::initializeSerializerLayout() {
if (outputType_->size() == 0 || outputChannels_.empty()) {
serializerInputType_ = outputType_;
return;
}

std::unordered_map<column_index_t, column_index_t> outputToSerializerInput;
outputToSerializerInput.reserve(outputChannels_.size());

std::vector<std::string> names;
std::vector<TypePtr> types;
names.reserve(outputChannels_.size());
types.reserve(outputChannels_.size());
serializerInputByOutput_.reserve(outputChannels_.size());

for (const auto outputChannel : outputChannels_) {
auto it = outputToSerializerInput.find(outputChannel);
if (it == outputToSerializerInput.end()) {
const auto serializerInputChannel =
static_cast<column_index_t>(serializerInputChannels_.size());
serializerInputChannels_.push_back(outputChannel);
names.push_back(inputType_->nameOf(outputChannel));
types.push_back(inputType_->childAt(outputChannel));
it =
outputToSerializerInput.emplace(outputChannel, serializerInputChannel)
.first;
}
serializerInputByOutput_.push_back(it->second);
}

serializerInputType_ = ROW(std::move(names), std::move(types));
}

RowVectorPtr OptimizedPartitionedOutput::prepareSerializerInput(
const RowVectorPtr& input) const {
VELOX_CHECK_NOT_NULL(input);

if (serializerInputType_->size() == 0) {
return std::make_shared<RowVector>(
input->pool(),
serializerInputType_,
nullptr /*nulls*/,
input->size(),
std::vector<VectorPtr>{});
}

if (serializerInputChannels_.empty()) {
input->loadedVector();
return input;
}

std::vector<VectorPtr> serializerInputColumns;
serializerInputColumns.reserve(serializerInputChannels_.size());
for (auto channel : serializerInputChannels_) {
auto loadedChild = BaseVector::loadedVectorShared(input->childAt(channel));
serializerInputColumns.push_back(loadedChild);
}

return std::make_shared<RowVector>(
input->pool(),
serializerInputType_,
nullptr /*nulls*/,
input->size(),
std::move(serializerInputColumns));
}

void OptimizedPartitionedOutput::flush() {
const auto flushedBytes = serializer_->bytesBuffered();
const auto flushedRows = serializer_->rowsBuffered();
Expand Down
24 changes: 21 additions & 3 deletions velox/exec/OptimizedPartitionedOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,25 @@ class OptimizedPartitionedOutput : public Operator {
bool isFinished() override;

private:
/// Computes the serializer input columns and the mapping from output columns
/// to serializer input columns.
void initializeSerializerLayout();

/// Builds the RowVector consumed by the serializer. When the output layout
/// has duplicated columns, this projects only the distinct columns and
/// leaves duplication to flush time.
RowVectorPtr prepareSerializerInput(const RowVectorPtr& input) const;

/// Serializes all buffered rows into Presto pages and enqueues each page
/// into the output buffer manager. All destinations are always enqueued;
/// sets blockingReason_ and records a future if the output buffer is full.
/// Increments numFlushes_ on each call.
void flush();

const std::string taskId_;
/// Input row type; also used as output type (column reordering not yet
/// applied).
const RowTypePtr inputType_;
const std::vector<column_index_t> keyChannels_;
/// Non-empty when the output column order differs from the input.
/// Non-empty when the output layout differs from the input
const std::vector<column_index_t> outputChannels_;
const int32_t numDestinations_;

Expand All @@ -78,12 +85,23 @@ class OptimizedPartitionedOutput : public Operator {
const int64_t maxOutputBufferBytes_;

velox::memory::MemoryPool* pool_;

/// Computes per-row partition assignments. Null when numDestinations_ == 1.
std::unique_ptr<core::PartitionFunction> partitionFunction_;
/// Reusable buffer for per-row partition assignments.
std::vector<uint32_t> partitions_;

std::unique_ptr<serializer::presto::PrestoIterativePartitioningSerializer>
serializer_;
/// Row type passed to serializer_->append(). It only includes distinct
/// columns from the output layout.
RowTypePtr serializerInputType_;
Comment thread
xin-zhang2 marked this conversation as resolved.
/// Input channels that make up the serializer input type. Empty if the output
/// layout is the same as the input.
std::vector<column_index_t> serializerInputChannels_;
/// For each output column index, store the corresponding serializer input
/// column.
std::vector<column_index_t> serializerInputByOutput_;

BlockingReason blockingReason_{BlockingReason::kNotBlocked};
ContinueFuture future_;
Expand Down
177 changes: 144 additions & 33 deletions velox/exec/tests/OptimizedPartitionedOutputTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,64 @@ class OptimizedPartitionedOutputTest : public OperatorTestBase {
return result;
}

RowTypePtr outputTypeForLayout(
const RowTypePtr& inputType,
const std::vector<std::string>& outputLayout) {
if (outputLayout.empty()) {
return inputType;
}

std::vector<TypePtr> types;
types.reserve(outputLayout.size());
for (const auto& name : outputLayout) {
types.push_back(inputType->findChild(name));
}
return ROW(outputLayout, std::move(types));
}

RowVectorPtr buildOutput(
const RowVectorPtr& input,
const std::vector<std::string>& outputLayout) {
const auto inputType = asRowType(input->type());
const auto outputType = outputTypeForLayout(inputType, outputLayout);

std::vector<VectorPtr> columns;
columns.reserve(outputLayout.size());
for (const auto& name : outputLayout) {
columns.push_back(input->childAt(inputType->getChildIdx(name)));
}
return std::make_shared<RowVector>(
input->pool(), outputType, nullptr, input->size(), std::move(columns));
}

/// Sorts a vector by value for order-independent comparison. Returns a
/// dictionary vector with rows sorted in ascending order.
VectorPtr canonicalize(const VectorPtr& vector) {
const auto numRows = vector->size();
auto indices = makeIndices(numRows, [](auto i) { return i; });
auto* data = indices->asMutable<vector_size_t>();
std::stable_sort(data, data + numRows, [&](auto a, auto b) {
return vector->compare(vector.get(), a, b) < 0;
});
return BaseVector::wrapInDictionary(nullptr, indices, numRows, vector);
}

/// Builds a RowVector by gathering rows from inputBatches at the given
/// (batchIdx, rowIdx) positions. Used to construct the per-partition expected
/// RowVector.
RowVectorPtr gatherRows(
const std::vector<RowVectorPtr>& batches,
const std::vector<std::pair<int, int>>& rowList,
const RowTypePtr& rowType) {
const auto numRows = static_cast<vector_size_t>(rowList.size());
auto result = std::static_pointer_cast<RowVector>(
BaseVector::create(rowType, numRows, pool()));
for (vector_size_t r = 0; r < numRows; ++r) {
result->copy(batches[rowList[r].first].get(), r, rowList[r].second, 1);
}
return result;
}

int64_t getIntRuntimeStat(Task* task, const std::string& statName) {
const auto taskStats = task->taskStats();
const auto& runtimeStats =
Expand All @@ -264,14 +322,34 @@ class OptimizedPartitionedOutputTest : public OperatorTestBase {
int numPartitions,
std::unordered_map<std::string, std::string> extraConfig = {},
std::chrono::seconds timeout = std::chrono::seconds{30}) {
return runPartitionedOutputWithLayout(
taskId,
inputBatches,
partitionKeys,
numPartitions,
{},
std::move(extraConfig),
timeout);
}

PartitionedOutputResult runPartitionedOutputWithLayout(
const std::string& taskId,
const std::vector<RowVectorPtr>& inputBatches,
const std::vector<std::string>& partitionKeys,
int numPartitions,
const std::vector<std::string>& outputLayout,
std::unordered_map<std::string, std::string> extraConfig = {},
std::chrono::seconds timeout = std::chrono::seconds{30}) {
VELOX_CHECK(!inputBatches.empty());
const auto rowType =
std::dynamic_pointer_cast<const RowType>(inputBatches[0]->type());
const auto outputType = outputTypeForLayout(rowType, outputLayout);

auto plan = PlanBuilder()
.values(inputBatches)
.partitionedOutput(partitionKeys, numPartitions)
.planNode();
auto plan =
PlanBuilder()
.values(inputBatches)
.partitionedOutput(partitionKeys, numPartitions, outputLayout)
.planNode();

auto task = Task::create(
taskId,
Expand Down Expand Up @@ -306,7 +384,7 @@ class OptimizedPartitionedOutputTest : public OperatorTestBase {
if (result.pageCounts[p] > 0) {
++result.numNonEmptyPartitions;
}
result.rowCounts[p] = concatPages(result.pages[p], rowType)->size();
result.rowCounts[p] = concatPages(result.pages[p], outputType)->size();
}

result.numAppends = getIntRuntimeStat(task.get(), "numAppends");
Expand Down Expand Up @@ -446,34 +524,6 @@ class OptimizedPartitionedOutputParamTest
return makeRowVector(names, vecs);
}

/// Sorts a vector by value for order-independent comparison. Returns a
/// dictionary vector with rows sorted in ascending order.
VectorPtr canonicalize(const VectorPtr& vector) {
const auto numRows = vector->size();
auto indices = makeIndices(numRows, [](auto i) { return i; });
auto* data = indices->asMutable<vector_size_t>();
std::stable_sort(data, data + numRows, [&](auto a, auto b) {
return vector->compare(vector.get(), a, b) < 0;
});
return BaseVector::wrapInDictionary(nullptr, indices, numRows, vector);
}

/// Builds a RowVector by gathering rows from inputBatches at the given
/// (batchIdx, rowIdx) positions. Used to construct the per-partition expected
/// RowVector.
RowVectorPtr gatherRows(
const std::vector<RowVectorPtr>& batches,
const std::vector<std::pair<int, int>>& rowList,
const RowTypePtr& rowType) {
const auto numRows = static_cast<vector_size_t>(rowList.size());
auto result = std::static_pointer_cast<RowVector>(
BaseVector::create(rowType, numRows, pool()));
for (vector_size_t r = 0; r < numRows; ++r) {
result->copy(batches[rowList[r].first].get(), r, rowList[r].second, 1);
}
return result;
}

/// Verifies that the deserialized pages for each partition exactly match the
/// rows from inputBatches that were routed to that partition. Both expected
/// and actual rows are sorted (canonicalized) before comparison to allow
Expand Down Expand Up @@ -922,4 +972,65 @@ TEST_F(OptimizedPartitionedOutputTest, replicateNullsAndAnyUnsupported) {
"replicateNullsAndAny is not yet supported by OptimizedPartitionedOutput"));
}

TEST_F(OptimizedPartitionedOutputTest, outputLayout) {
auto input = makeRowVector(
{"p1", "v1", "v2", "unused"},
{makeFlatVector<int32_t>({0, 1, 2, 3, 4, 5, 6, 7}),
makeFlatVector<int64_t>({10, 11, 12, 13, 14, 15, 16, 17}),
makeFlatVector<int8_t>({20, 21, 22, 23, 24, 25, 26, 27}),
makeFlatVector<int64_t>({30, 31, 32, 33, 34, 35, 36, 37})});
auto inputCopy =
std::static_pointer_cast<RowVector>(BaseVector::copy(*input, pool()));

const std::vector<std::string> outputLayout = {"v2", "v1"};
const auto inputType = asRowType(input->type());
const auto outputType = outputTypeForLayout(inputType, outputLayout);
auto expected = buildOutput(inputCopy, outputLayout);

auto result = runPartitionedOutputWithLayout(
"local://test-optimized-output-layout", {input}, {}, 1, outputLayout);

auto actual = concatPages(result.pages[0], outputType);
velox::test::assertEqualVectors(expected, actual);
}

TEST_F(OptimizedPartitionedOutputTest, duplicateOutputColumns) {
constexpr int kNumPartitions = 4;
auto input = makeRowVector(
{"p1", "v1"},
{makeFlatVector<int32_t>({0, 1, 2, 3, 0, 1, 2, 3}),
makeFlatVector<int64_t>({10, 11, 12, 13, 14, 15, 16, 17})});
auto inputCopy =
std::static_pointer_cast<RowVector>(BaseVector::copy(*input, pool()));
const std::vector<std::string> outputLayout = {"v1", "v1"};
const auto inputType = asRowType(input->type());
const auto outputType = outputTypeForLayout(inputType, outputLayout);
auto output = buildOutput(inputCopy, outputLayout);

auto result = runPartitionedOutputWithLayout(
"local://test-optimized-output-layout-duplicated-columns",
{input},
{"p1"},
kNumPartitions,
outputLayout);

std::vector<uint32_t> assignments(inputCopy->size());
auto partitionFn = std::make_unique<HashPartitionFunction>(
false, kNumPartitions, inputType, std::vector<column_index_t>{0});
partitionFn->partition(*inputCopy, assignments);

std::vector<std::vector<std::pair<int, int>>> expectedRows(kNumPartitions);
for (vector_size_t i = 0; i < assignments.size(); ++i) {
expectedRows[assignments[i]].emplace_back(0, i);
}

for (int p = 0; p < kNumPartitions; ++p) {
auto expected = gatherRows({output}, expectedRows[p], outputType);
auto actual = concatPages(result.pages[p], outputType);
ASSERT_EQ(expected->size(), actual->size()) << "partition " << p;
velox::test::assertEqualVectors(
canonicalize(expected), canonicalize(actual));
}
}

} // namespace facebook::velox::exec::test
Loading
Loading