Skip to content

Commit 425bd5d

Browse files
committed
address review: RAII guards, metrics filtering, and TODOs
- Use ArrowSchemaGuard/ArrowArrayGuard in FlushBuffer for memory safety on early returns, fixing potential leaks when nanoarrow macros fail - Fix guards to handle already-consumed arrays (null release check) - Filter out value_counts/null_value_counts/nan_value_counts for delete metadata columns (file_path, pos) to match Java parity; also drop bounds when referencing multiple data files - Add TODO for extracting paths from ArrowArray in Write() to update referenced_paths_ for batch writes - Add TODO for row_schema support in position deletes (V2 spec)
1 parent e683ce2 commit 425bd5d

File tree

2 files changed

+29
-4
lines changed

2 files changed

+29
-4
lines changed

src/iceberg/arrow_c_data_guard_internal.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222
namespace iceberg::internal {
2323

2424
ArrowArrayGuard::~ArrowArrayGuard() {
25-
if (array_ != nullptr) {
25+
if (array_ != nullptr && array_->release != nullptr) {
2626
ArrowArrayRelease(array_);
2727
}
2828
}
2929

3030
ArrowSchemaGuard::~ArrowSchemaGuard() {
31-
if (schema_ != nullptr) {
31+
if (schema_ != nullptr && schema_->release != nullptr) {
3232
ArrowSchemaRelease(schema_);
3333
}
3434
}

src/iceberg/data/position_delete_writer.cc

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <nanoarrow/nanoarrow.h>
2727

2828
#include "iceberg/arrow/nanoarrow_status_internal.h"
29+
#include "iceberg/arrow_c_data_guard_internal.h"
2930
#include "iceberg/file_writer.h"
3031
#include "iceberg/manifest/manifest_entry.h"
3132
#include "iceberg/metadata_columns.h"
@@ -38,7 +39,8 @@ namespace iceberg {
3839
class PositionDeleteWriter::Impl {
3940
public:
4041
static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions options) {
41-
// Build the position delete schema with file_path and pos columns
42+
// TODO: Support writing row data if options.row_schema is provided.
43+
// The V2 spec allows position deletes to optionally include the deleted row.
4244
std::vector<SchemaField> fields;
4345
fields.push_back(MetadataColumns::kDeleteFilePath);
4446
fields.push_back(MetadataColumns::kDeleteFilePos);
@@ -61,6 +63,8 @@ class PositionDeleteWriter::Impl {
6163

6264
Status Write(ArrowArray* data) {
6365
ICEBERG_DCHECK(writer_, "Writer not initialized");
66+
// TODO: Extract file paths from ArrowArray to update referenced_paths_ so that
67+
// Metadata() can correctly populate referenced_data_file for batch writes.
6468
return writer_->Write(data);
6569
}
6670

@@ -101,6 +105,26 @@ class PositionDeleteWriter::Impl {
101105
ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
102106
auto split_offsets = writer_->split_offsets();
103107

108+
// Filter out metrics for delete metadata columns (file_path, pos) to avoid
109+
// bloating the manifest, matching Java's PositionDeleteWriter behavior.
110+
// Always remove field counts; also remove bounds when referencing multiple files.
111+
const auto path_id = MetadataColumns::kDeleteFilePathColumnId;
112+
const auto pos_id = MetadataColumns::kDeleteFilePosColumnId;
113+
114+
metrics.value_counts.erase(path_id);
115+
metrics.value_counts.erase(pos_id);
116+
metrics.null_value_counts.erase(path_id);
117+
metrics.null_value_counts.erase(pos_id);
118+
metrics.nan_value_counts.erase(path_id);
119+
metrics.nan_value_counts.erase(pos_id);
120+
121+
if (referenced_paths_.size() > 1) {
122+
metrics.lower_bounds.erase(path_id);
123+
metrics.lower_bounds.erase(pos_id);
124+
metrics.upper_bounds.erase(path_id);
125+
metrics.upper_bounds.erase(pos_id);
126+
}
127+
104128
// Serialize literal bounds to binary format
105129
std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
106130
for (const auto& [col_id, literal] : metrics.lower_bounds) {
@@ -154,11 +178,13 @@ class PositionDeleteWriter::Impl {
154178
Status FlushBuffer() {
155179
ArrowSchema arrow_schema;
156180
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*delete_schema_, &arrow_schema));
181+
internal::ArrowSchemaGuard schema_guard(&arrow_schema);
157182

158183
ArrowArray array;
159184
ArrowError error;
160185
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
161186
ArrowArrayInitFromSchema(&array, &arrow_schema, &error), error);
187+
internal::ArrowArrayGuard array_guard(&array);
162188
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&array));
163189

164190
for (size_t i = 0; i < buffered_paths_.size(); ++i) {
@@ -178,7 +204,6 @@ class PositionDeleteWriter::Impl {
178204

179205
buffered_paths_.clear();
180206
buffered_positions_.clear();
181-
arrow_schema.release(&arrow_schema);
182207
return {};
183208
}
184209

0 commit comments

Comments
 (0)