Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions velox/experimental/cudf/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ add_library(
CudfOrderBy.cpp
CudfTopN.cpp
CudfTopNRowNumber.cpp
CudfWindow.cpp
DebugUtil.cpp
OperatorAdapters.cpp
ToCudf.cpp
Expand Down
91 changes: 85 additions & 6 deletions velox/experimental/cudf/exec/CudfConversion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@

#include "velox/exec/Driver.h"
#include "velox/exec/Operator.h"
#include "velox/type/Type.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DecodedVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/SelectivityVector.h"

#include <cudf/copying.hpp>
#include <cudf/table/table.hpp>
Expand Down Expand Up @@ -67,6 +71,51 @@ cudf::size_type preferredGpuBatchSizeRows(
"velox.cudf.gpu_batch_size_rows must be <= max(vector_size_t)");
return batchSize;
}

/// Cast a column to the plan's output type when cuDF/Arrow produced a
/// different physical type (e.g. INTEGER for ROW_NUMBER/COUNT vs BIGINT).
VectorPtr castColumnToPlanType(
const VectorPtr& source,
const TypePtr& targetType,
memory::MemoryPool* pool) {
if (source->type()->kindEquals(targetType)) {
return source;
}
const auto size = source->size();
if (targetType->isBigint() &&
(source->type()->isInteger() || source->type()->isSmallint() ||
source->type()->isTinyint())) {
DecodedVector decoded(*source);
auto result = BaseVector::create<FlatVector<int64_t>>(
BIGINT(), size, pool);
for (vector_size_t i = 0; i < size; ++i) {
if (decoded.isNullAt(i)) {
result->setNull(i, true);
} else {
int64_t v;
switch (source->typeKind()) {
case TypeKind::INTEGER:
v = decoded.valueAt<int32_t>(i);
break;
case TypeKind::SMALLINT:
v = decoded.valueAt<int16_t>(i);
break;
case TypeKind::TINYINT:
v = decoded.valueAt<int8_t>(i);
break;
default:
VELOX_UNREACHABLE();
}
result->set(i, v);
}
}
return result;
}
VELOX_UNSUPPORTED(
"CudfToVelox: cannot cast column from {} to {}",
source->type()->toString(),
targetType->toString());
}
} // namespace

CudfFromVelox::CudfFromVelox(
Expand Down Expand Up @@ -226,11 +275,26 @@ RowVectorPtr CudfToVelox::getOutput() {
finished_ = noMoreInput_ && inputs_.empty();
return nullptr;
}
RowVectorPtr output =
with_arrow::toVeloxColumn(tableView, pool(), "", stream);
RowVectorPtr output = with_arrow::toVeloxColumn(
tableView, pool(), outputType_->names(), stream, get_temp_mr());
stream.synchronize();
finished_ = noMoreInput_ && inputs_.empty();
output->setType(outputType_);
if (!output->type()->kindEquals(outputType_)) {
std::vector<VectorPtr> children;
children.reserve(output->childrenSize());
for (column_index_t i = 0; i < output->childrenSize(); ++i) {
children.push_back(castColumnToPlanType(
output->childAt(i), outputType_->childAt(i), pool()));
}
output = std::make_shared<RowVector>(
pool(),
outputType_,
output->nulls(),
output->size(),
std::move(children));
} else {
output->setType(outputType_);
}
// cudfVector goes out of scope here, freeing the GPU memory
return output;
}
Expand Down Expand Up @@ -293,11 +357,26 @@ RowVectorPtr CudfToVelox::getOutput() {
return nullptr;
}

RowVectorPtr output =
with_arrow::toVeloxColumn(resultTable->view(), pool(), "", stream);
RowVectorPtr output = with_arrow::toVeloxColumn(
resultTable->view(), pool(), outputType_->names(), stream, get_temp_mr());
stream.synchronize();
finished_ = noMoreInput_ && inputs_.empty();
output->setType(outputType_);
if (!output->type()->kindEquals(outputType_)) {
std::vector<VectorPtr> children;
children.reserve(output->childrenSize());
for (column_index_t i = 0; i < output->childrenSize(); ++i) {
children.push_back(castColumnToPlanType(
output->childAt(i), outputType_->childAt(i), pool()));
}
output = std::make_shared<RowVector>(
pool(),
outputType_,
output->nulls(),
output->size(),
std::move(children));
} else {
output->setType(outputType_);
}
return output;
}

Expand Down
Loading