Skip to content

Commit 3184cdb

Browse files
committed
Test
1 parent cf376e3 commit 3184cdb

18 files changed

Lines changed: 92 additions & 135 deletions

File tree

backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala

Lines changed: 1 addition & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,7 @@ class VeloxIcebergSuite extends IcebergSuite {
327327
val lastExecId = statusStore.executionsList().last.executionId
328328
val executionMetrics = statusStore.executionMetrics(lastExecId)
329329

330-
// TODO: fix https://github.com/apache/gluten/issues/11510
331-
assert(executionMetrics(metrics("numWrittenFiles").id).toLong == 0)
330+
assert(executionMetrics(metrics("numWrittenFiles").id).toLong == 1)
332331
}
333332
}
334333

@@ -465,81 +464,4 @@ class VeloxIcebergSuite extends IcebergSuite {
465464
)
466465
}
467466
}
468-
test("iceberg show stats has non-empty min max for store sales table") {
469-
withTable("store_sales_10_rows") {
470-
spark.sql("""
471-
|CREATE TABLE store_sales_10_rows (
472-
| ss_sold_date_sk INT,
473-
| ss_sold_time_sk INT,
474-
| ss_item_sk INT,
475-
| ss_customer_sk INT,
476-
| ss_cdemo_sk INT,
477-
| ss_hdemo_sk INT,
478-
| ss_addr_sk INT,
479-
| ss_store_sk INT,
480-
| ss_promo_sk INT,
481-
| ss_ticket_number BIGINT,
482-
| ss_quantity INT,
483-
| ss_wholesale_cost DECIMAL(7,2),
484-
| ss_list_price DECIMAL(7,2),
485-
| ss_sales_price DECIMAL(7,2),
486-
| ss_ext_discount_amt DECIMAL(7,2),
487-
| ss_ext_sales_price DECIMAL(7,2),
488-
| ss_ext_wholesale_cost DECIMAL(7,2),
489-
| ss_ext_list_price DECIMAL(7,2),
490-
| ss_ext_tax DECIMAL(7,2),
491-
| ss_coupon_amt DECIMAL(7,2),
492-
| ss_net_paid DECIMAL(7,2),
493-
| ss_net_paid_inc_tax DECIMAL(7,2),
494-
| ss_net_profit DECIMAL(7,2)
495-
|) USING iceberg
496-
|""".stripMargin)
497-
498-
spark.sql(
499-
"""
500-
|INSERT INTO store_sales_10_rows VALUES
501-
|(2450899, null, 174781, null, null, 5105, 712262, null, null, 875206344, null, 75.64, 105.13, null, 0.00, null, 3328.16, 4625.72, null, 0.00, null, null, null),
502-
|(2450899, 45381, 240260, 63498438, 1296795, 5105, 712262, 542, 1925, 875206344, 13, 5.12, 7.27, 2.18, 0.00, 28.34, 66.56, 94.51, 1.98, 0.00, 28.34, 30.32, -38.22),
503-
|(2450899, 45381, 360506, 63498438, 1296795, 5105, 712262, 542, 332, 875206344, 69, 36.45, 70.34, 16.17, 0.00, 1115.73, 2515.05, 4853.46, 22.31, 0.00, 1115.73, 1138.04, -1399.32),
504-
|(2450899, 45381, 197360, 63498438, 1296795, 5105, 712262, 542, 1486, 875206344, 50, 92.87, 167.16, 58.50, 0.00, 2925.00, 4643.50, 8358.00, 204.75, 0.00, 2925.00, 3129.75, -1718.50),
505-
|(2450899, 45381, 58255, 63498438, 1296795, 5105, 712262, 542, 359, 875206344, 100, 85.99, 105.76, 47.59, 523.49, 4759.00, 8599.00, 10576.00, 296.48, 523.49, 4235.51, 4531.99, -4363.49),
506-
|(2450899, 45381, 219500, 63498438, 1296795, 5105, 712262, 542, 8, 875206344, 77, 80.61, 121.72, 26.77, 2020.06, 2061.29, 6206.97, 9372.44, 1.64, 2020.06, 41.23, 42.87, -6165.74),
507-
|(2450899, 45381, 60157, 63498438, 1296795, 5105, 712262, 542, 484, 875206344, 9, 44.58, 62.85, 50.28, 0.00, 452.52, 401.22, 565.65, 27.15, 0.00, 452.52, 479.67, 51.30),
508-
|(2450899, 45381, 132362, 63498438, 1296795, 5105, 712262, 542, 1575, 875206344, 86, 30.79, 31.71, 25.68, 0.00, 2208.48, 2647.94, 2727.06, 22.08, 0.00, 2208.48, 2230.56, -439.46),
509-
|(2450899, 45381, 41590, 63498438, 1296795, 5105, 712262, 542, 441, 875206344, 40, 79.99, 137.58, 12.38, 0.00, 495.20, 3199.60, 5503.20, 9.90, 0.00, 495.20, 505.10, -2704.40)
510-
|""".stripMargin)
511-
512-
spark.sql("ANALYZE store_sales_10_rows")
513-
514-
withSQLConf("spark.sql.statistics.ignoreStatsCalculatorFailures" -> "false") {
515-
val stats = spark.sql("SHOW STATS FOR store_sales_10_rows")
516-
517-
val statsByColumn =
518-
stats.collect().map(row => row.getAs[String]("column_name") -> row).toMap
519-
520-
Seq(
521-
"ss_sold_date_sk",
522-
"ss_item_sk",
523-
"ss_ticket_number",
524-
"ss_quantity",
525-
"ss_wholesale_cost",
526-
"ss_list_price",
527-
"ss_sales_price",
528-
"ss_ext_sales_price",
529-
"ss_net_profit"
530-
).foreach {
531-
colName =>
532-
val row = statsByColumn(colName)
533-
534-
assert(
535-
Option(row.getAs[Any]("min")).exists(_.toString.nonEmpty),
536-
s"Expected non-empty min for $colName in SHOW STATS output: $row")
537-
538-
assert(
539-
Option(row.getAs[Any]("max")).exists(_.toString.nonEmpty),
540-
s"Expected non-empty max for $colName in SHOW STATS output: $row")
541-
}
542-
}
543-
}
544-
}
545467
}

cpp/velox/CMakeLists.txt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,8 @@ if(ENABLE_GPU)
223223
memory/GpuBufferColumnarBatch.cc)
224224
endif()
225225

226-
if(ENABLE_ENHANCED_FEATURES)
227-
list(APPEND VELOX_SRCS compute/iceberg/IcebergFormat.cc
228-
compute/iceberg/IcebergWriter.cc)
229-
endif()
226+
list(APPEND VELOX_SRCS compute/iceberg/IcebergFormat.cc
227+
compute/iceberg/IcebergWriter.cc)
230228

231229
if(BUILD_TESTS OR BUILD_BENCHMARKS)
232230
set(BUILD_TEST_UTILS ON)

cpp/velox/compute/VeloxBackend.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
#include "velox/connectors/hive/BufferedInputBuilder.h"
5252
#include "velox/connectors/hive/HiveConnector.h"
5353
#include "velox/connectors/hive/HiveDataSource.h"
54+
#include "velox/connectors/hive/iceberg/IcebergConnector.h"
5455
#include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" // @manual
5556
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h" // @manual
5657
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
@@ -323,6 +324,13 @@ std::shared_ptr<facebook::velox::connector::Connector> VeloxBackend::createHiveC
323324
return std::make_shared<velox::connector::hive::HiveConnector>(connectorId, hiveConnectorConfig_, ioExecutor);
324325
}
325326

327+
std::shared_ptr<facebook::velox::connector::Connector> VeloxBackend::createIcebergConnector(
328+
const std::string& connectorId,
329+
folly::Executor* ioExecutor) const {
330+
return std::make_shared<velox::connector::hive::iceberg::IcebergConnector>(
331+
connectorId, hiveConnectorConfig_, ioExecutor);
332+
}
333+
326334
std::shared_ptr<facebook::velox::connector::Connector> VeloxBackend::createValueStreamConnector(
327335
const std::string& connectorId,
328336
bool dynamicFilterEnabled) const {

cpp/velox/compute/VeloxBackend.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ class VeloxBackend {
7474
const std::string& connectorId,
7575
folly::Executor* ioExecutor) const;
7676

77+
std::shared_ptr<facebook::velox::connector::Connector> createIcebergConnector(
78+
const std::string& connectorId,
79+
folly::Executor* ioExecutor) const;
80+
7781
std::shared_ptr<facebook::velox::connector::Connector> createValueStreamConnector(
7882
const std::string& connectorId,
7983
bool dynamicFilterEnabled) const;

cpp/velox/compute/VeloxConnectorIds.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ namespace gluten {
2323

2424
struct VeloxConnectorIds {
2525
std::string hive;
26+
std::string iceberg;
2627
std::string iterator;
2728
std::string cudfHive;
2829
bool hiveRegistered{false};
30+
bool icebergRegistered{false};
2931
bool iteratorRegistered{false};
3032
bool cudfHiveRegistered{false};
3133
};

cpp/velox/compute/VeloxRuntime.cc

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ std::string makeScopedConnectorId(const std::string& base, uint64_t runtimeId) {
213213
VeloxConnectorIds makeScopedConnectorIds(uint64_t runtimeId) {
214214
return VeloxConnectorIds{
215215
.hive = makeScopedConnectorId(kHiveConnectorId, runtimeId),
216+
.iceberg = makeScopedConnectorId(kIcebergConnectorId, runtimeId),
216217
.iterator = makeScopedConnectorId(kIteratorConnectorId, runtimeId),
217218
.cudfHive = makeScopedConnectorId(kCudfHiveConnectorId, runtimeId)};
218219
}
@@ -271,6 +272,14 @@ void VeloxRuntime::registerConnectors() {
271272
velox::connector::hasConnector(connectorIds_.hive),
272273
"Scoped hive connector not found after registration: " + connectorIds_.hive);
273274

275+
connectorIds_.icebergRegistered =
276+
velox::connector::registerConnector(backend->createIcebergConnector(connectorIds_.iceberg, ioExecutor_.get()));
277+
GLUTEN_CHECK(
278+
connectorIds_.icebergRegistered, "Failed to register scoped Iceberg connector: " + connectorIds_.iceberg);
279+
GLUTEN_CHECK(
280+
velox::connector::hasConnector(connectorIds_.iceberg),
281+
"Scoped Iceberg connector not found after registration: " + connectorIds_.iceberg);
282+
274283
const auto valueStreamDynamicFilterEnabled =
275284
veloxCfg_->get<bool>(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault);
276285
connectorIds_.iteratorRegistered = velox::connector::registerConnector(
@@ -310,6 +319,10 @@ void VeloxRuntime::unregisterConnectors() {
310319
velox::connector::unregisterConnector(connectorIds_.hive);
311320
connectorIds_.hiveRegistered = false;
312321
}
322+
if (connectorIds_.icebergRegistered) {
323+
velox::connector::unregisterConnector(connectorIds_.iceberg);
324+
connectorIds_.icebergRegistered = false;
325+
}
313326
}
314327

315328
void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size) {
@@ -488,7 +501,6 @@ std::shared_ptr<RowToColumnarConverter> VeloxRuntime::createRow2ColumnarConverte
488501
return std::make_shared<VeloxRowToColumnarConverter>(cSchema, veloxPool);
489502
}
490503

491-
#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
492504
std::shared_ptr<IcebergWriter> VeloxRuntime::createIcebergWriter(
493505
RowTypePtr rowType,
494506
int32_t format,
@@ -516,7 +528,6 @@ std::shared_ptr<IcebergWriter> VeloxRuntime::createIcebergWriter(
516528
veloxPool,
517529
connectorPool);
518530
}
519-
#endif
520531

521532
std::shared_ptr<ShuffleWriter> VeloxRuntime::createShuffleWriter(
522533
int32_t numPartitions,

cpp/velox/compute/VeloxRuntime.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,15 @@
2020
#include "WholeStageResultIterator.h"
2121
#include "compute/Runtime.h"
2222
#include "compute/VeloxConnectorIds.h"
23-
#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
2423
#include "iceberg/IcebergWriter.h"
25-
#endif
2624
#include <folly/Executor.h>
2725
#include "memory/VeloxMemoryManager.h"
2826
#include "operators/serializer/VeloxColumnarBatchSerializer.h"
2927
#include "operators/serializer/VeloxColumnarToRowConverter.h"
3028
#include "operators/writer/VeloxParquetDataSource.h"
3129
#include "shuffle/ShuffleReader.h"
3230
#include "shuffle/ShuffleWriter.h"
33-
34-
#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
3531
#include "IcebergNestedField.pb.h"
36-
#endif
3732

3833
namespace gluten {
3934

@@ -76,7 +71,6 @@ class VeloxRuntime final : public Runtime {
7671

7772
std::shared_ptr<RowToColumnarConverter> createRow2ColumnarConverter(struct ArrowSchema* cSchema) override;
7873

79-
#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
8074
std::shared_ptr<IcebergWriter> createIcebergWriter(
8175
RowTypePtr rowType,
8276
int32_t format,
@@ -88,7 +82,6 @@ class VeloxRuntime final : public Runtime {
8882
std::shared_ptr<const facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
8983
const gluten::IcebergNestedField& protoField,
9084
const std::unordered_map<std::string, std::string>& sparkConfs);
91-
#endif
9285

9386
std::shared_ptr<ShuffleWriter> createShuffleWriter(
9487
int numPartitions,

cpp/velox/compute/WholeStageResultIterator.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ WholeStageResultIterator::WholeStageResultIterator(
161161
std::unordered_map<std::string, std::string> customSplitInfo{{"table_format", "hive-iceberg"}};
162162
auto deleteFiles = icebergSplitInfo->deleteFilesVec[idx];
163163
split = std::make_shared<velox::connector::hive::iceberg::HiveIcebergSplit>(
164-
connectorIds_.hive,
164+
connectorIds_.iceberg,
165165
paths[idx],
166166
format,
167167
starts[idx],
@@ -216,6 +216,7 @@ std::shared_ptr<velox::core::QueryCtx> WholeStageResultIterator::createNewVeloxQ
216216
std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>> connectorConfigs;
217217
auto hiveSessionConfig = createHiveConnectorSessionConfig(veloxCfg_);
218218
connectorConfigs[connectorIds_.hive] = hiveSessionConfig;
219+
connectorConfigs[connectorIds_.iceberg] = hiveSessionConfig;
219220
connectorConfigs[connectorIds_.iterator] = hiveSessionConfig;
220221
#ifdef GLUTEN_ENABLE_GPU
221222
if (!connectorIds_.cudfHive.empty()) {

cpp/velox/compute/iceberg/IcebergWriter.cc

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
#include "IcebergWriter.h"
1919

20+
#include "IcebergNestedField.pb.h"
2021
#include "IcebergPartitionSpec.pb.h"
2122
#include "compute/ProtobufUtils.h"
23+
#include "compute/VeloxBackend.h"
2224
#include "compute/iceberg/IcebergFormat.h"
2325
#include "config/VeloxConfig.h"
2426
#include "utils/ConfigExtractor.h"
@@ -99,9 +101,9 @@ class GlutenIcebergFileNameGenerator : public connector::hive::FileNameGenerator
99101
mutable int32_t fileCount_;
100102
};
101103

102-
iceberg::IcebergNestedField convertToIcebergNestedField(const gluten::IcebergNestedField& protoField) {
103-
IcebergNestedField result;
104-
result.id = protoField.id();
104+
parquet::ParquetFieldId convertToIcebergNestedField(const gluten::IcebergNestedField& protoField) {
105+
parquet::ParquetFieldId result;
106+
result.fieldId = protoField.id();
105107

106108
// Recursively convert children
107109
result.children.reserve(protoField.children_size());
@@ -121,7 +123,7 @@ std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
121123
int64_t taskId,
122124
const std::string& operationId,
123125
std::shared_ptr<const IcebergPartitionSpec> spec,
124-
const iceberg::IcebergNestedField& nestedField,
126+
const parquet::ParquetFieldId& nestedField,
125127
facebook::velox::memory::MemoryPool* pool) {
126128
std::vector<std::shared_ptr<const iceberg::IcebergColumnHandle>> columnHandles;
127129

@@ -139,14 +141,12 @@ std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
139141
columnNames.at(i),
140142
connector::hive::HiveColumnHandle::ColumnType::kPartitionKey,
141143
columnTypes.at(i),
142-
columnTypes.at(i),
143144
nestedField.children[i]));
144145
} else {
145146
columnHandles.push_back(std::make_shared<iceberg::IcebergColumnHandle>(
146147
columnNames.at(i),
147148
connector::hive::HiveColumnHandle::ColumnType::kRegular,
148149
columnTypes.at(i),
149-
columnTypes.at(i),
150150
nestedField.children[i]));
151151
}
152152
}
@@ -157,18 +157,9 @@ std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
157157
std::shared_ptr<const connector::hive::LocationHandle> locationHandle =
158158
std::make_shared<connector::hive::LocationHandle>(
159159
outputDirectoryPath, outputDirectoryPath, connector::hive::LocationHandle::TableType::kExisting);
160-
const std::vector<IcebergSortingColumn> sortedBy;
161160
const std::unordered_map<std::string, std::string> serdeParameters;
162161
return std::make_shared<connector::hive::iceberg::IcebergInsertTableHandle>(
163-
columnHandles,
164-
locationHandle,
165-
spec,
166-
pool,
167-
fileFormat,
168-
sortedBy,
169-
compressionKind,
170-
serdeParameters,
171-
fileNameGenerator);
162+
columnHandles, locationHandle, fileFormat, spec, compressionKind, serdeParameters, fileNameGenerator);
172163
}
173164

174165
} // namespace
@@ -200,20 +191,36 @@ IcebergWriter::IcebergWriter(
200191
connectorSessionProperties_ = createHiveConnectorSessionConfig(veloxCfg);
201192
connectorConfig_ =
202193
std::make_shared<facebook::velox::connector::hive::HiveConfig>(createHiveConnectorConfig(veloxCfg));
194+
std::unordered_map<std::string, std::shared_ptr<facebook::velox::config::ConfigBase>> connectorConfigs;
195+
connectorConfigs[kHiveConnectorId] = connectorSessionProperties_;
196+
auto queryConfigBase = std::make_shared<facebook::velox::config::ConfigBase>(
197+
std::unordered_map<std::string, std::string>(sparkConfs));
198+
queryCtx_ = facebook::velox::core::QueryCtx::create(
199+
nullptr,
200+
facebook::velox::core::QueryConfig{facebook::velox::core::QueryConfig::ConfigTag{}, queryConfigBase},
201+
connectorConfigs,
202+
nullptr, // cache
203+
pool_,
204+
nullptr, // spillExecutor
205+
"IcebergWriter");
206+
207+
auto expressionEvaluator =
208+
std::make_unique<facebook::velox::exec::SimpleExpressionEvaluator>(queryCtx_.get(), pool_.get());
209+
203210
connectorQueryCtx_ = std::make_unique<connector::ConnectorQueryCtx>(
204211
pool_.get(),
205212
connectorPool_.get(),
206213
connectorSessionProperties_.get(),
207214
nullptr,
208215
common::PrefixSortConfig(),
209-
nullptr,
216+
std::move(expressionEvaluator),
210217
nullptr,
211218
"query.IcebergDataSink",
212219
"task.IcebergDataSink",
213220
"planNodeId.IcebergDataSink",
214221
0,
215222
"");
216-
223+
auto icebergConfig = std::make_shared<facebook::velox::connector::hive::iceberg::IcebergConfig>(veloxCfg);
217224
dataSink_ = std::make_unique<IcebergDataSink>(
218225
rowType_,
219226
createIcebergInsertTableHandle(
@@ -229,7 +236,9 @@ IcebergWriter::IcebergWriter(
229236
pool_.get()),
230237
connectorQueryCtx_.get(),
231238
facebook::velox::connector::CommitStrategy::kNoCommit,
232-
connectorConfig_);
239+
connectorConfig_,
240+
icebergConfig);
241+
dataSink_.get();
233242
}
234243

235244
void IcebergWriter::write(const VeloxColumnarBatch& batch) {
@@ -238,10 +247,19 @@ void IcebergWriter::write(const VeloxColumnarBatch& batch) {
238247

239248
if (inputRowType->size() != rowType_->size()) {
240249
const auto& children = inputRowVector->children();
241-
std::vector<VectorPtr> dataColumns(children.begin() + 1, children.begin() + 1 + rowType_->size());
250+
251+
VELOX_CHECK_GE(children.size(), 1 + rowType_->size());
252+
253+
std::vector<VectorPtr> dataColumns(
254+
children.begin() + 1,
255+
children.begin() + 1 + rowType_->size());
242256

243257
auto filteredRowVector = std::make_shared<RowVector>(
244-
pool_.get(), rowType_, inputRowVector->nulls(), inputRowVector->size(), std::move(dataColumns));
258+
pool_.get(),
259+
rowType_,
260+
nullptr,
261+
inputRowVector->size(),
262+
std::move(dataColumns));
245263

246264
dataSink_->appendData(filteredRowVector);
247265
} else {

0 commit comments

Comments
 (0)