Skip to content

Commit 61d9860

Browse files
committed
feat(PartitionedOutput): Add StringView support in PrestoIterativePartitioningSerializer
1 parent e3a23d8 commit 61d9860

3 files changed

Lines changed: 330 additions & 3 deletions

File tree

velox/serializers/PrestoIterativePartitioningSerializer.cpp

Lines changed: 187 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,66 @@ simpleColumnBytes(const TypePtr& colType, int64_t numRows, int64_t numNulls) {
127127
(numRows - numNulls) * fixedTypeWidth(colType->kind()); // values
128128
}
129129

130+
int64_t variableWidthColumnBytes(
131+
int64_t numRows,
132+
int64_t numNulls,
133+
int64_t valueBytes) {
134+
return 4 + static_cast<int64_t>(kVariableWidth.size()) + // header
135+
4 + // rowCount
136+
4 * numRows + // offsets
137+
1 + // nullFlag
138+
(numNulls > 0 ? bits::nbytes(numRows) : 0) + // null bitmap
139+
4 + // valueBytes
140+
valueBytes; // values
141+
}
142+
143+
std::vector<int64_t> computeVariableWidthDataSizes(
144+
const std::vector<PartitionedVectorPtr>& partitionedVectors,
145+
uint32_t numPartitions) {
146+
std::vector<int64_t> dataSizes(numPartitions, 0);
147+
for (const auto& partitionedVector : partitionedVectors) {
148+
auto encoding = partitionedVector->baseVector()->encoding();
149+
switch (encoding) {
150+
case VectorEncoding::Simple::FLAT: {
151+
auto* flatVector =
152+
partitionedVector->as<PartitionedFlatVector<StringView>>();
153+
VELOX_DCHECK_NOT_NULL(flatVector);
154+
155+
const auto* rawValues =
156+
flatVector->baseVector()->asFlatVector<StringView>()->rawValues();
157+
const auto* rawNulls =
158+
flatVector->baseVector()->asFlatVector<StringView>()->rawNulls();
159+
const auto* rawPartitionOffsets =
160+
partitionedVector->rawPartitionOffsets();
161+
162+
vector_size_t lastOffset = 0;
163+
for (uint32_t p = 0; p < numPartitions; ++p) {
164+
const auto offset = rawPartitionOffsets[p];
165+
for (auto i = lastOffset; i < offset; ++i) {
166+
if (!rawNulls || !bits::isBitNull(rawNulls, i)) {
167+
dataSizes[p] += rawValues[i].size();
168+
}
169+
}
170+
lastOffset = offset;
171+
}
172+
break;
173+
}
174+
case VectorEncoding::Simple::BIASED:
175+
case VectorEncoding::Simple::CONSTANT:
176+
case VectorEncoding::Simple::DICTIONARY:
177+
case VectorEncoding::Simple::SEQUENCE:
178+
VELOX_NYI(
179+
"Unsupported vector encoding for PrestoIterativePartitioningSerializer: ",
180+
encoding);
181+
default:
182+
VELOX_UNSUPPORTED(
183+
"Invalid vector encoding for PrestoIterativePartitioningSerializer: ",
184+
encoding);
185+
}
186+
}
187+
return dataSizes;
188+
}
189+
130190
/// Returns per-partition exact byte counts for one column (all partitions).
131191
/// Recurses into nested ROW columns.
132192
///
@@ -167,9 +227,17 @@ std::vector<int64_t> computeColumnFlushSizes(
167227
}
168228
break;
169229

170-
case TypeKind::TIMESTAMP:
171230
case TypeKind::VARCHAR:
172-
case TypeKind::VARBINARY:
231+
case TypeKind::VARBINARY: {
232+
const auto dataSizes =
233+
computeVariableWidthDataSizes(columnVectors, numPartitions);
234+
for (uint32_t p : nonEmptyPartitions) {
235+
sizes[p] = variableWidthColumnBytes(
236+
rowsPerPartition[p], nullCounts[p], dataSizes[p]);
237+
}
238+
break;
239+
}
240+
case TypeKind::TIMESTAMP:
173241
case TypeKind::ARRAY:
174242
case TypeKind::MAP:
175243
VELOX_NYI(
@@ -478,9 +546,12 @@ void PrestoIterativePartitioningSerializer::flushColumn(
478546
partitionedVectors, colType, nonEmptyPartitions, outputStreams);
479547
break;
480548

481-
case TypeKind::TIMESTAMP:
482549
case TypeKind::VARCHAR:
483550
case TypeKind::VARBINARY:
551+
flushVariableWidthColumn(
552+
partitionedVectors, colType, nonEmptyPartitions, outputStreams);
553+
break;
554+
case TypeKind::TIMESTAMP:
484555
case TypeKind::ROW:
485556
case TypeKind::ARRAY:
486557
case TypeKind::MAP:
@@ -507,6 +578,25 @@ void PrestoIterativePartitioningSerializer::flushSimpleColumn(
507578
}
508579
}
509580

581+
void PrestoIterativePartitioningSerializer::flushVariableWidthColumn(
582+
const std::vector<PartitionedVectorPtr>& partitionedVectors,
583+
const TypePtr& colType,
584+
const std::vector<uint32_t>& nonEmptyPartitions,
585+
const std::vector<IOBufOutputStream*>& outputStreams) const {
586+
flushHeader(typeToEncodingName(colType), nonEmptyPartitions, outputStreams);
587+
flushRowCounts(nonEmptyPartitions, outputStreams);
588+
const auto dataSizes = flushOffsets(partitionedVectors, outputStreams);
589+
flushNulls(partitionedVectors, nonEmptyPartitions, outputStreams);
590+
591+
for (uint32_t p : nonEmptyPartitions) {
592+
writeInt32(outputStreams[p], dataSizes[p]);
593+
}
594+
595+
for (size_t i = 0; i < partitionedVectors.size(); i++) {
596+
flushSingleVariableWidthVector(partitionedVectors[i], outputStreams);
597+
}
598+
}
599+
510600
template <TypeKind kind>
511601
void PrestoIterativePartitioningSerializer::flushSingleFlatVector(
512602
const PartitionedVectorPtr& partitionedVector,
@@ -590,6 +680,56 @@ void PrestoIterativePartitioningSerializer::flushSingleSimpleVector(
590680
}
591681
}
592682

683+
void PrestoIterativePartitioningSerializer::flushSingleVariableWidthFlatVector(
684+
const PartitionedVectorPtr& partitionedVector,
685+
const std::vector<IOBufOutputStream*>& outputStreams) const {
686+
auto* flatVector = partitionedVector->as<PartitionedFlatVector<StringView>>();
687+
VELOX_DCHECK_NOT_NULL(flatVector);
688+
const auto* rawValues =
689+
flatVector->baseVector()->as<FlatVector<StringView>>()->rawValues();
690+
const auto* rawNulls = flatVector->baseVector()->rawNulls();
691+
const auto* partitionOffsets = flatVector->rawPartitionOffsets();
692+
693+
vector_size_t lastOffset = 0;
694+
for (uint32_t p = 0; p < numPartitions_; ++p) {
695+
const auto offset = partitionOffsets[p];
696+
if (outputStreams[p] != nullptr) {
697+
for (vector_size_t i = lastOffset; i < offset; ++i) {
698+
if (rawNulls && bits::isBitNull(rawNulls, i)) {
699+
continue;
700+
}
701+
const auto& value = rawValues[i];
702+
if (value.size() > 0) {
703+
outputStreams[p]->write(value.data(), value.size());
704+
}
705+
}
706+
lastOffset = offset;
707+
}
708+
}
709+
}
710+
711+
void PrestoIterativePartitioningSerializer::flushSingleVariableWidthVector(
712+
const PartitionedVectorPtr& partitionedVector,
713+
const std::vector<IOBufOutputStream*>& outputStreams) const {
714+
auto encoding = partitionedVector->baseVector()->encoding();
715+
switch (encoding) {
716+
case VectorEncoding::Simple::FLAT:
717+
flushSingleVariableWidthFlatVector(partitionedVector, outputStreams);
718+
break;
719+
case VectorEncoding::Simple::BIASED:
720+
case VectorEncoding::Simple::CONSTANT:
721+
case VectorEncoding::Simple::DICTIONARY:
722+
case VectorEncoding::Simple::SEQUENCE:
723+
VELOX_NYI(
724+
"Unsupported vector encoding for PrestoIterativePartitioningSerializer: ",
725+
encoding);
726+
default:
727+
VELOX_UNSUPPORTED(
728+
"Invalid vector encoding for PrestoIterativePartitioningSerializer: ",
729+
encoding);
730+
}
731+
}
732+
593733
// ---------------------------------------------------------------------------
594734
// Column building blocks
595735
// ---------------------------------------------------------------------------
@@ -729,4 +869,48 @@ void PrestoIterativePartitioningSerializer::flushSequentialOffsets(
729869
}
730870
}
731871

872+
std::vector<int32_t> PrestoIterativePartitioningSerializer::flushOffsets(
873+
const std::vector<PartitionedVectorPtr>& partitionedVectors,
874+
const std::vector<IOBufOutputStream*>& outputStreams) const {
875+
std::vector<int32_t> offsets(numPartitions_, 0);
876+
for (const auto& partitionedVector : partitionedVectors) {
877+
auto encoding = partitionedVector->baseVector()->encoding();
878+
switch (encoding) {
879+
case VectorEncoding::Simple::FLAT: {
880+
auto* flatVector =
881+
partitionedVector->as<PartitionedFlatVector<StringView>>();
882+
VELOX_DCHECK_NOT_NULL(flatVector);
883+
const auto* rawValues =
884+
flatVector->baseVector()->as<FlatVector<StringView>>()->rawValues();
885+
const auto* rawNulls = flatVector->baseVector()->rawNulls();
886+
const auto* partitionOffsets = flatVector->rawPartitionOffsets();
887+
vector_size_t lastOffset = 0;
888+
for (uint32_t p = 0; p < numPartitions_; ++p) {
889+
const auto offset = partitionOffsets[p];
890+
if (outputStreams[p] != nullptr) {
891+
for (vector_size_t i = lastOffset; i < offset; ++i) {
892+
if (!rawNulls || !bits::isBitNull(rawNulls, i)) {
893+
offsets[p] += rawValues[i].size();
894+
}
895+
writeInt32(outputStreams[p], offsets[p]);
896+
}
897+
}
898+
lastOffset = offset;
899+
}
900+
} break;
901+
case VectorEncoding::Simple::BIASED:
902+
case VectorEncoding::Simple::CONSTANT:
903+
case VectorEncoding::Simple::DICTIONARY:
904+
case VectorEncoding::Simple::SEQUENCE:
905+
VELOX_NYI(
906+
"Unsupported vector encoding for PrestoIterativePartitioningSerializer: ",
907+
encoding);
908+
default:
909+
VELOX_UNSUPPORTED(
910+
"Invalid vector encoding for PrestoIterativePartitioningSerializer: ",
911+
encoding);
912+
}
913+
}
914+
return offsets;
915+
}
732916
} // namespace facebook::velox::serializer::presto

velox/serializers/PrestoIterativePartitioningSerializer.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,29 @@ class PrestoIterativePartitioningSerializer {
107107
const std::vector<uint32_t>& nonEmptyPartitions,
108108
const std::vector<IOBufOutputStream*>& outputStreams) const;
109109

110+
void flushVariableWidthColumn(
111+
const std::vector<PartitionedVectorPtr>& partitionedVectors,
112+
const TypePtr& colType,
113+
const std::vector<uint32_t>& nonEmptyPartitions,
114+
const std::vector<IOBufOutputStream*>& outputStreams) const;
115+
110116
void flushSingleSimpleVector(
111117
const PartitionedVectorPtr& partitionedVector,
112118
const std::vector<IOBufOutputStream*>& outputStreams) const;
113119

120+
void flushSingleVariableWidthVector(
121+
const PartitionedVectorPtr& partitionedVector,
122+
const std::vector<IOBufOutputStream*>& outputStreams) const;
123+
114124
template <TypeKind kind>
115125
void flushSingleFlatVector(
116126
const PartitionedVectorPtr& partitionedVector,
117127
const std::vector<IOBufOutputStream*>& outputStreams) const;
118128

129+
void flushSingleVariableWidthFlatVector(
130+
const PartitionedVectorPtr& partitionedVector,
131+
const std::vector<IOBufOutputStream*>& outputStreams) const;
132+
119133
void flushHeader(
120134
std::string_view name,
121135
const std::vector<uint32_t>& nonEmptyPartitions,
@@ -141,6 +155,10 @@ class PrestoIterativePartitioningSerializer {
141155
const std::vector<uint32_t>& nonEmptyPartitions,
142156
const std::vector<IOBufOutputStream*>& outputStreams) const;
143157

158+
std::vector<int32_t> flushOffsets(
159+
const std::vector<PartitionedVectorPtr>& partitionedVectors,
160+
const std::vector<IOBufOutputStream*>& outputStreams) const;
161+
144162
RowTypePtr type_;
145163
uint32_t numPartitions_;
146164
SerdeOpts opts_;

0 commit comments

Comments
 (0)