Skip to content

Commit 6d96d61

Browse files
committed
fix: review comments
1 parent 6881200 commit 6d96d61

5 files changed

Lines changed: 99 additions & 116 deletions

File tree

src/iceberg/transform.cc

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,28 @@ std::string Transform::ToString() const {
388388
std::unreachable();
389389
}
390390

391+
std::string Transform::GeneratePartitionName(std::string_view source_name) const {
392+
switch (transform_type_) {
393+
case TransformType::kIdentity:
394+
return std::string(source_name);
395+
case TransformType::kBucket:
396+
// Format: sourceName_bucket_N (matching Java: sourceName + "_bucket_" + numBuckets)
397+
return std::format("{}_bucket_{}", source_name, std::get<int32_t>(param_));
398+
case TransformType::kTruncate:
399+
// Format: sourceName_trunc_N (matching Java: sourceName + "_trunc_" + width)
400+
return std::format("{}_trunc_{}", source_name, std::get<int32_t>(param_));
401+
case TransformType::kYear:
402+
case TransformType::kMonth:
403+
case TransformType::kDay:
404+
case TransformType::kHour:
405+
case TransformType::kUnknown:
406+
return std::format("{}_{}", source_name, TransformTypeToString(transform_type_));
407+
case TransformType::kVoid:
408+
return std::format("{}_null", source_name);
409+
}
410+
std::unreachable();
411+
}
412+
391413
TransformFunction::TransformFunction(TransformType transform_type,
392414
std::shared_ptr<Type> source_type)
393415
: transform_type_(transform_type), source_type_(std::move(source_type)) {}

src/iceberg/transform.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ class ICEBERG_EXPORT Transform : public util::Formattable {
197197
/// \brief Returns a string representation of this transform (e.g., "bucket[16]").
198198
std::string ToString() const override;
199199

200+
/// \brief Generates a partition name for the transform.
201+
/// \param source_name The name of the source column.
202+
/// \return A string representation of the partition name.
203+
std::string GeneratePartitionName(std::string_view source_name) const;
204+
200205
/// \brief Equality comparison.
201206
friend bool operator==(const Transform& lhs, const Transform& rhs) {
202207
return lhs.Equals(rhs);

src/iceberg/transform_function.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ class ICEBERG_EXPORT BucketTransform : public TransformFunction {
5959
/// \brief Returns INT32 as the output type.
6060
std::shared_ptr<Type> ResultType() const override;
6161

62+
/// \brief Returns the number of buckets.
63+
int32_t num_buckets() const { return num_buckets_; }
64+
6265
/// \brief Create a BucketTransform.
6366
/// \param source_type Type of the input data.
6467
/// \param num_buckets Number of buckets to hash into.

src/iceberg/update/update_partition_spec.cc

Lines changed: 60 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,16 @@ UpdatePartitionSpec::UpdatePartitionSpec(TableIdentifier identifier,
4343
: identifier_(std::move(identifier)),
4444
catalog_(std::move(catalog)),
4545
base_metadata_(std::move(base)) {
46-
ICEBERG_DCHECK(catalog_, "Catalog is required to construct UpdatePartitionSpec");
47-
ICEBERG_DCHECK(base_metadata_,
48-
"Base table metadata is required to construct UpdatePartitionSpec");
46+
if (catalog_ == nullptr) [[unlikely]] {
47+
AddError(ErrorKind::kInvalidArgument,
48+
"Catalog is required to construct UpdatePartitionSpec");
49+
return;
50+
}
51+
if (base_metadata_ == nullptr) [[unlikely]] {
52+
AddError(ErrorKind::kInvalidArgument,
53+
"Base table metadata is required to construct UpdatePartitionSpec");
54+
return;
55+
}
4956
format_version_ = base_metadata_->format_version;
5057

5158
// Get the current/default partition spec
@@ -94,17 +101,12 @@ UpdatePartitionSpec& UpdatePartitionSpec::AddNonDefaultSpec() {
94101
UpdatePartitionSpec& UpdatePartitionSpec::AddField(const std::string& source_name) {
95102
// Find the source field in the schema
96103
auto field_result = schema_->FindFieldByName(source_name, case_sensitive_);
97-
if (!field_result.has_value()) {
98-
AddError(ErrorKind::kInvalidArgument,
99-
std::format("Cannot find source field: {}", source_name));
100-
return *this;
101-
}
104+
BUILDER_RETURN_IF_ERROR(field_result);
102105

103106
auto field_opt = field_result.value();
104107
if (!field_opt.has_value()) {
105-
AddError(ErrorKind::kInvalidArgument,
106-
std::format("Cannot find source field: {}", source_name));
107-
return *this;
108+
return AddError(ErrorKind::kInvalidArgument,
109+
std::format("Cannot find source field: {}", source_name));
108110
}
109111

110112
int32_t source_id = field_opt.value().get().field_id();
@@ -160,8 +162,8 @@ UpdatePartitionSpec& UpdatePartitionSpec::AddFieldInternal(
160162
const std::string* name, int32_t source_id, std::shared_ptr<Transform> transform) {
161163
// Check for duplicate name in added fields
162164
if (name != nullptr) {
163-
auto it = name_to_added_field_.find(*name);
164-
if (it != name_to_added_field_.end()) {
165+
auto it = added_field_names_.find(*name);
166+
if (it != added_field_names_.end()) {
165167
AddError(ErrorKind::kInvalidArgument,
166168
std::format("Cannot add duplicate partition field: {}", *name));
167169
return *this;
@@ -174,30 +176,32 @@ UpdatePartitionSpec& UpdatePartitionSpec::AddFieldInternal(
174176
auto existing_it = transform_to_field_.find(validation_key);
175177
if (existing_it != transform_to_field_.end()) {
176178
const auto& existing = existing_it->second;
177-
if (deletes_.contains(existing.field_id()) && *existing.transform() == *transform) {
179+
if (deletes_.contains(existing->field_id()) && *existing->transform() == *transform) {
178180
// If the field was deleted and we're re-adding the same one, just undo the delete
179-
return RewriteDeleteAndAddField(existing, name);
181+
return RewriteDeleteAndAddField(*existing, name);
180182
}
181183

182-
if (deletes_.find(existing.field_id()) == deletes_.end()) {
184+
if (deletes_.find(existing->field_id()) == deletes_.end()) {
183185
AddError(
184186
ErrorKind::kInvalidArgument,
185187
std::format(
186-
"Cannot add duplicate partition field for source {} with transform {}, "
188+
"Cannot add duplicate partition field {} for source {} with transform {}, "
187189
"conflicts with {}",
188-
source_id, transform->ToString(), existing.ToString()));
190+
name ? *name : "unknown", source_id, transform->ToString(),
191+
existing->ToString()));
189192
return *this;
190193
}
191194
}
192195

193196
// Check if already being added
194-
auto added_it = transform_to_added_field_.find(validation_key);
195-
if (added_it != transform_to_added_field_.end()) {
196-
AddError(ErrorKind::kInvalidArgument,
197-
std::format(
198-
"Cannot add duplicate partition field for source {} with transform {}, "
199-
"already added: {}",
200-
source_id, transform->ToString(), added_it->second.ToString()));
197+
if (transform_to_added_field_.contains(validation_key)) {
198+
AddError(
199+
ErrorKind::kInvalidArgument,
200+
std::format(
201+
"Cannot add duplicate partition field {} for source {} with transform {}, "
202+
"already added: {}",
203+
name ? *name : "unknown", source_id, transform->ToString(),
204+
transform_to_added_field_.at(validation_key)));
201205
return *this;
202206
}
203207

@@ -218,19 +222,18 @@ UpdatePartitionSpec& UpdatePartitionSpec::AddFieldInternal(
218222

219223
// Check for redundant time-based partitions
220224
CheckForRedundantAddedPartitions(new_field);
221-
222-
transform_to_added_field_.emplace(validation_key, new_field);
225+
transform_to_added_field_.emplace(validation_key, field_name);
223226

224227
// Handle name conflicts with existing fields
225228
auto existing_name_it = name_to_field_.find(field_name);
226229
if (existing_name_it != name_to_field_.end()) {
227230
const auto& existing_field = existing_name_it->second;
228-
if (!deletes_.contains(existing_field.field_id())) {
229-
if (IsVoidTransform(existing_field)) {
231+
if (!deletes_.contains(existing_field->field_id())) {
232+
if (IsVoidTransform(*existing_field)) {
230233
// Rename the old deleted field
231234
std::string renamed =
232-
std::format("{}_{}", existing_field.name(), existing_field.field_id());
233-
renames_[std::string(existing_field.name())] = renamed;
235+
std::format("{}_{}", existing_field->name(), existing_field->field_id());
236+
RenameField(std::string(existing_field->name()), renamed);
234237
} else {
235238
AddError(
236239
ErrorKind::kInvalidArgument,
@@ -240,13 +243,13 @@ UpdatePartitionSpec& UpdatePartitionSpec::AddFieldInternal(
240243
} else {
241244
// Field is being deleted, rename it to avoid conflict
242245
std::string renamed =
243-
std::format("{}_{}", existing_field.name(), existing_field.field_id());
244-
renames_[std::string(existing_field.name())] = renamed;
246+
std::format("{}_{}", existing_field->name(), existing_field->field_id());
247+
renames_[std::string(existing_field->name())] = renamed;
245248
}
246249
}
247250

248-
name_to_added_field_.emplace(field_name, new_field);
249251
adds_.push_back(new_field);
252+
added_field_names_.emplace(field_name);
250253

251254
return *this;
252255
}
@@ -262,8 +265,7 @@ UpdatePartitionSpec& UpdatePartitionSpec::RewriteDeleteAndAddField(
262265

263266
UpdatePartitionSpec& UpdatePartitionSpec::RemoveField(const std::string& name) {
264267
// Cannot delete newly added fields
265-
auto added_it = name_to_added_field_.find(name);
266-
if (added_it != name_to_added_field_.end()) {
268+
if (added_field_names_.contains(name)) {
267269
AddError(ErrorKind::kInvalidArgument,
268270
std::format("Cannot delete newly added field: {}", name));
269271
return *this;
@@ -283,7 +285,7 @@ UpdatePartitionSpec& UpdatePartitionSpec::RemoveField(const std::string& name) {
283285
return *this;
284286
}
285287

286-
deletes_.insert(field_it->second.field_id());
288+
deletes_.insert(field_it->second->field_id());
287289
return *this;
288290
}
289291

@@ -326,8 +328,7 @@ UpdatePartitionSpec& UpdatePartitionSpec::RemoveField(
326328
UpdatePartitionSpec& UpdatePartitionSpec::RemoveFieldByTransform(
327329
const TransformKey& key, const std::string& term_str) {
328330
// Cannot delete newly added fields
329-
auto added_it = transform_to_added_field_.find(key);
330-
if (added_it != transform_to_added_field_.end()) {
331+
if (transform_to_added_field_.contains(key)) {
331332
AddError(ErrorKind::kInvalidArgument,
332333
std::format("Cannot delete newly added field: {}", term_str));
333334
return *this;
@@ -342,29 +343,28 @@ UpdatePartitionSpec& UpdatePartitionSpec::RemoveFieldByTransform(
342343

343344
const auto& field = field_it->second;
344345
// Cannot rename and delete
345-
if (renames_.find(std::string(field.name())) != renames_.end()) {
346+
if (renames_.find(std::string(field->name())) != renames_.end()) {
346347
AddError(ErrorKind::kInvalidArgument,
347-
std::format("Cannot rename and delete partition field: {}", field.name()));
348+
std::format("Cannot rename and delete partition field: {}", field->name()));
348349
return *this;
349350
}
350351

351-
deletes_.insert(field.field_id());
352+
deletes_.insert(field->field_id());
352353
return *this;
353354
}
354355

355356
UpdatePartitionSpec& UpdatePartitionSpec::RenameField(const std::string& name,
356357
const std::string& new_name) {
357358
// Handle existing void field with the new name
358359
auto existing_it = name_to_field_.find(new_name);
359-
if (existing_it != name_to_field_.end() && IsVoidTransform(existing_it->second)) {
360-
std::string renamed =
361-
std::format("{}_{}", existing_it->second.name(), existing_it->second.field_id());
362-
renames_[new_name] = renamed;
360+
if (existing_it != name_to_field_.end() && IsVoidTransform(*existing_it->second)) {
361+
std::string renamed = std::format("{}_{}", existing_it->second->name(),
362+
existing_it->second->field_id());
363+
RenameField(std::string(existing_it->second->name()), renamed);
363364
}
364365

365366
// Cannot rename newly added fields
366-
auto added_it = name_to_added_field_.find(name);
367-
if (added_it != name_to_added_field_.end()) {
367+
if (added_field_names_.contains(name)) {
368368
AddError(ErrorKind::kInvalidArgument,
369369
std::format("Cannot rename newly added partition field: {}", name));
370370
return *this;
@@ -378,7 +378,7 @@ UpdatePartitionSpec& UpdatePartitionSpec::RenameField(const std::string& name,
378378
}
379379

380380
// Cannot delete and rename
381-
if (deletes_.contains(field_it->second.field_id())) {
381+
if (deletes_.contains(field_it->second->field_id())) {
382382
AddError(ErrorKind::kInvalidArgument,
383383
std::format("Cannot delete and rename partition field: {}", name));
384384
return *this;
@@ -507,51 +507,7 @@ std::string UpdatePartitionSpec::GeneratePartitionName(
507507
source_name = std::string(field_result.value().value().get().name());
508508
}
509509

510-
// Extract parameter from transform string for bucket and truncate
511-
// Transform::ToString() returns "bucket[16]" or "truncate[4]" format
512-
std::string transform_str = transform->ToString();
513-
514-
switch (transform->transform_type()) {
515-
case TransformType::kIdentity:
516-
return source_name;
517-
case TransformType::kBucket: {
518-
// Parse "bucket[N]" to extract N
519-
// Format: sourceName_bucket_N (matching Java: sourceName + "_bucket_" + numBuckets)
520-
size_t open_bracket = transform_str.find('[');
521-
size_t close_bracket = transform_str.find(']');
522-
if (open_bracket != std::string::npos && close_bracket != std::string::npos) {
523-
std::string param_str =
524-
transform_str.substr(open_bracket + 1, close_bracket - open_bracket - 1);
525-
return std::format("{}_{}_{}", source_name, "bucket", param_str);
526-
}
527-
return std::format("{}_bucket", source_name);
528-
}
529-
case TransformType::kTruncate: {
530-
// Parse "truncate[N]" to extract N
531-
// Format: sourceName_trunc_N (matching Java: sourceName + "_trunc_" + width)
532-
size_t open_bracket = transform_str.find('[');
533-
size_t close_bracket = transform_str.find(']');
534-
if (open_bracket != std::string::npos && close_bracket != std::string::npos) {
535-
std::string param_str =
536-
transform_str.substr(open_bracket + 1, close_bracket - open_bracket - 1);
537-
return std::format("{}_{}_{}", source_name, "trunc", param_str);
538-
}
539-
return std::format("{}_trunc", source_name);
540-
}
541-
case TransformType::kYear:
542-
return std::format("{}_year", source_name);
543-
case TransformType::kMonth:
544-
return std::format("{}_month", source_name);
545-
case TransformType::kDay:
546-
return std::format("{}_day", source_name);
547-
case TransformType::kHour:
548-
return std::format("{}_hour", source_name);
549-
case TransformType::kVoid:
550-
return std::format("{}_null", source_name);
551-
case TransformType::kUnknown:
552-
return std::format("{}_unknown", source_name);
553-
}
554-
std::unreachable();
510+
return transform->GeneratePartitionName(source_name);
555511
}
556512

557513
bool UpdatePartitionSpec::IsTimeTransform(const std::shared_ptr<Transform>& transform) {
@@ -571,36 +527,33 @@ bool UpdatePartitionSpec::IsVoidTransform(const PartitionField& field) {
571527
}
572528

573529
void UpdatePartitionSpec::CheckForRedundantAddedPartitions(const PartitionField& field) {
574-
if (HasErrors()) return;
575-
576530
if (IsTimeTransform(field.transform())) {
577-
auto it = added_time_fields_.find(field.source_id());
578-
if (it != added_time_fields_.end()) {
531+
if (added_time_fields_.contains(field.source_id())) {
579532
AddError(ErrorKind::kInvalidArgument,
580533
std::format("Cannot add redundant partition field: {} conflicts with {}",
581-
field.ToString(), it->second.ToString()));
534+
field.ToString(), added_time_fields_.at(field.source_id())));
582535
return;
583536
}
584-
added_time_fields_.emplace(field.source_id(), field);
537+
added_time_fields_.emplace(field.source_id(), field.ToString());
585538
}
586539
}
587540

588-
std::unordered_map<std::string, PartitionField> UpdatePartitionSpec::IndexSpecByName(
589-
const PartitionSpec& spec) {
590-
std::unordered_map<std::string, PartitionField> index;
541+
std::unordered_map<std::string, const PartitionField*>
542+
UpdatePartitionSpec::IndexSpecByName(const PartitionSpec& spec) {
543+
std::unordered_map<std::string, const PartitionField*> index;
591544
for (const auto& field : spec.fields()) {
592-
index.emplace(std::string(field.name()), field);
545+
index.emplace(std::string(field.name()), &field);
593546
}
594547
return index;
595548
}
596549

597-
std::unordered_map<UpdatePartitionSpec::TransformKey, PartitionField,
550+
std::unordered_map<UpdatePartitionSpec::TransformKey, const PartitionField*,
598551
UpdatePartitionSpec::TransformKeyHash>
599552
UpdatePartitionSpec::IndexSpecByTransform(const PartitionSpec& spec) {
600-
std::unordered_map<TransformKey, PartitionField, TransformKeyHash> index;
553+
std::unordered_map<TransformKey, const PartitionField*, TransformKeyHash> index;
601554
for (const auto& field : spec.fields()) {
602555
TransformKey key{field.source_id(), field.transform()->ToString()};
603-
index.emplace(key, field);
556+
index.emplace(key, &field);
604557
}
605558
return index;
606559
}

0 commit comments

Comments
 (0)