Skip to content

Commit 181fc8a

Browse files
authored
feat: metrics for avro writer (#604)
1 parent 5f4904f commit 181fc8a

File tree

5 files changed

+130
-7
lines changed

5 files changed

+130
-7
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ if(ICEBERG_BUILD_BUNDLE)
183183
avro/avro_data_util.cc
184184
avro/avro_direct_decoder.cc
185185
avro/avro_direct_encoder.cc
186+
avro/avro_metrics.cc
186187
avro/avro_reader.cc
187188
avro/avro_writer.cc
188189
avro/avro_register.cc

src/iceberg/avro/avro_metrics.cc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/avro/avro_metrics.h"
21+
22+
namespace iceberg::avro {
23+
24+
Metrics AvroMetrics::GetMetrics(const Schema& /*schema*/, int64_t num_records,
25+
const MetricsConfig& /*metrics_config*/) {
26+
// TODO(WZhuo) will populate in following PRs if datum writer is a
27+
// MetricsAwareDatumWriter
28+
return Metrics{.row_count = num_records};
29+
}
30+
31+
} // namespace iceberg::avro

src/iceberg/avro/avro_metrics.h

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include "iceberg/iceberg_bundle_export.h"
23+
#include "iceberg/metrics.h"
24+
25+
namespace iceberg {
26+
class Schema;
27+
class MetricsConfig;
28+
} // namespace iceberg
29+
30+
namespace iceberg::avro {
31+
32+
/// \brief Utility class for computing Avro file metrics.
33+
class ICEBERG_BUNDLE_EXPORT AvroMetrics {
34+
public:
35+
AvroMetrics() = delete;
36+
37+
/// \brief Compute metrics from writer state.
38+
/// \param schema The Iceberg schema of the written data.
39+
/// \param num_records The number of records written.
40+
/// \param metrics_config The metrics configuration.
41+
/// \return Metrics for the written Avro file.
42+
static Metrics GetMetrics(const Schema& schema, int64_t num_records,
43+
const MetricsConfig& metrics_config);
44+
};
45+
46+
} // namespace iceberg::avro

src/iceberg/avro/avro_writer.cc

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@
3333
#include "iceberg/arrow/arrow_status_internal.h"
3434
#include "iceberg/avro/avro_data_util_internal.h"
3535
#include "iceberg/avro/avro_direct_encoder_internal.h"
36+
#include "iceberg/avro/avro_metrics.h"
3637
#include "iceberg/avro/avro_register.h"
3738
#include "iceberg/avro/avro_schema_util_internal.h"
3839
#include "iceberg/avro/avro_stream_internal.h"
40+
#include "iceberg/metrics_config.h"
3941
#include "iceberg/schema.h"
4042
#include "iceberg/schema_internal.h"
4143
#include "iceberg/util/checked_cast.h"
@@ -238,6 +240,7 @@ class AvroWriter::Impl {
238240
ICEBERG_RETURN_UNEXPECTED(backend_->WriteRow(*write_schema_, *result, i));
239241
}
240242

243+
num_records_ += result->length();
241244
return {};
242245
}
243246

@@ -261,6 +264,14 @@ class AvroWriter::Impl {
261264
return current_pos;
262265
}
263266

267+
Result<Metrics> metrics() const {
268+
if (!Closed()) {
269+
return Invalid("AvroWriter is not closed");
270+
}
271+
return AvroMetrics::GetMetrics(*write_schema_, num_records_,
272+
*MetricsConfig::Default());
273+
}
274+
264275
private:
265276
// The schema to write.
266277
std::shared_ptr<::iceberg::Schema> write_schema_;
@@ -272,6 +283,8 @@ class AvroWriter::Impl {
272283
ArrowSchema arrow_schema_;
273284
// Total length of the written Avro file.
274285
int64_t total_bytes_ = 0;
286+
// Number of records written.
287+
int64_t num_records_ = 0;
275288
// The write backend to write data.
276289
std::unique_ptr<AvroWriteBackend> backend_;
277290
};
@@ -292,13 +305,7 @@ Status AvroWriter::Close() {
292305
return {};
293306
}
294307

295-
Result<Metrics> AvroWriter::metrics() {
296-
if (impl_->Closed()) {
297-
// TODO(xiao.dong) implement metrics
298-
return {};
299-
}
300-
return Invalid("AvroWriter is not closed");
301-
}
308+
Result<Metrics> AvroWriter::metrics() { return impl_->metrics(); }
302309

303310
Result<int64_t> AvroWriter::length() { return impl_->length(); }
304311

src/iceberg/test/avro_test.cc

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,6 +936,44 @@ TEST_P(AvroWriterTest, MultipleAvroBlocks) {
936936
}
937937
}
938938

939+
TEST_P(AvroWriterTest, Metrics) {
940+
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
941+
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
942+
SchemaField::MakeOptional(2, "name", std::make_shared<StringType>())});
943+
944+
std::string test_data = R"([[1, "Alice"], [2, "Bob"], [3, "Charlie"]])";
945+
946+
// Write data but don't close yet
947+
ArrowSchema arrow_c_schema;
948+
ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
949+
auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
950+
auto array = ::arrow::json::ArrayFromJSONString(arrow_schema, test_data).ValueOrDie();
951+
struct ArrowArray arrow_array;
952+
ASSERT_TRUE(::arrow::ExportArray(*array, &arrow_array).ok());
953+
954+
ICEBERG_UNWRAP_OR_FAIL(
955+
writer_,
956+
WriterFactoryRegistry::Open(
957+
FileFormatType::kAvro,
958+
{.path = temp_avro_file_, .schema = schema, .io = file_io_, .properties = {}}));
959+
ASSERT_THAT(writer_->Write(&arrow_array), IsOk());
960+
961+
// Metrics should fail before close
962+
ASSERT_THAT(writer_->metrics(), IsError(ErrorKind::kInvalid));
963+
964+
// After close, metrics should succeed
965+
ASSERT_THAT(writer_->Close(), IsOk());
966+
ICEBERG_UNWRAP_OR_FAIL(auto metrics, writer_->metrics());
967+
ASSERT_TRUE(metrics.row_count.has_value());
968+
EXPECT_EQ(metrics.row_count.value(), 3);
969+
EXPECT_TRUE(metrics.column_sizes.empty());
970+
EXPECT_TRUE(metrics.value_counts.empty());
971+
EXPECT_TRUE(metrics.null_value_counts.empty());
972+
EXPECT_TRUE(metrics.nan_value_counts.empty());
973+
EXPECT_TRUE(metrics.lower_bounds.empty());
974+
EXPECT_TRUE(metrics.upper_bounds.empty());
975+
}
976+
939977
// Instantiate parameterized tests for both direct encoder and GenericDatum paths
940978
INSTANTIATE_TEST_SUITE_P(DirectEncoderModes, AvroWriterTest,
941979
::testing::Values(true, false),

0 commit comments

Comments
 (0)