Skip to content

Commit 8e2852a

Browse files
committed
fix: review comments
1 parent c3850c6 commit 8e2852a

2 files changed

Lines changed: 83 additions & 89 deletions

File tree

src/iceberg/update/update_partition_spec.cc

Lines changed: 64 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "iceberg/update/update_partition_spec.h"
2121

2222
#include <format>
23+
#include <memory>
2324

2425
#include "iceberg/expression/term.h"
2526
#include "iceberg/partition_field.h"
@@ -45,24 +46,19 @@ Result<std::shared_ptr<UpdatePartitionSpec>> UpdatePartitionSpec::Make(
4546

4647
UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr<Transaction> transaction)
4748
: PendingUpdate(std::move(transaction)) {
48-
const TableMetadata* base_metadata = transaction_->base();
49-
if (base_metadata == nullptr) [[unlikely]] {
50-
AddError(ErrorKind::kInvalidArgument,
51-
"Base table metadata is required to construct UpdatePartitionSpec");
52-
return;
53-
}
54-
format_version_ = base_metadata->format_version;
49+
const TableMetadata& base_metadata = transaction_->current();
50+
format_version_ = base_metadata.format_version;
5551

5652
// Get the current/default partition spec
57-
auto spec_result = base_metadata->PartitionSpec();
53+
auto spec_result = base_metadata.PartitionSpec();
5854
if (!spec_result.has_value()) {
5955
AddError(spec_result.error());
6056
return;
6157
}
6258
spec_ = std::move(spec_result.value());
6359

6460
// Get the current schema
65-
auto schema_result = base_metadata->Schema();
61+
auto schema_result = base_metadata.Schema();
6662
if (!schema_result.has_value()) {
6763
AddError(schema_result.error());
6864
return;
@@ -112,39 +108,43 @@ UpdatePartitionSpec& UpdatePartitionSpec::AddField(const std::string& source_nam
112108
return AddFieldInternal(nullptr, source_id, Transform::Identity());
113109
}
114110

115-
UpdatePartitionSpec& UpdatePartitionSpec::AddField(std::shared_ptr<NamedReference> term,
116-
std::string part_name) {
117-
// Bind the term to get the source field
118-
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_ref, term->Bind(*schema_, case_sensitive_));
119-
int32_t source_id = bound_ref->field().field_id();
120-
121-
// Reference terms use identity transform
122-
return AddFieldInternal(part_name.empty() ? nullptr : &part_name, source_id,
123-
Transform::Identity());
124-
}
111+
UpdatePartitionSpec& UpdatePartitionSpec::AddField(const std::shared_ptr<Term>& term,
112+
const std::string& part_name) {
113+
ICEBERG_BUILDER_CHECK(term->is_unbound(), "Cannot add bound term to partition spec");
114+
// Bind the term to get the source id
115+
if (term->kind() == Term::Kind::kReference) {
116+
const auto& ref = std::dynamic_pointer_cast<NamedReference>(term);
117+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_ref,
118+
ref->Bind(*schema_, case_sensitive_));
119+
int32_t source_id = bound_ref->field().field_id();
120+
return AddFieldInternal(part_name.empty() ? nullptr : &part_name, source_id,
121+
Transform::Identity());
122+
} else if (term->kind() == Term::Kind::kTransform) {
123+
const auto& unbound_transform = std::dynamic_pointer_cast<UnboundTransform>(term);
124+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_transform,
125+
unbound_transform->Bind(*schema_, case_sensitive_));
126+
int32_t source_id = bound_transform->reference()->field().field_id();
127+
return AddFieldInternal(part_name.empty() ? nullptr : &part_name, source_id,
128+
bound_transform->transform());
129+
}
125130

126-
UpdatePartitionSpec& UpdatePartitionSpec::AddField(std::shared_ptr<UnboundTransform> term,
127-
std::string part_name) {
128-
// Bind the term to get the source field and transform
129-
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_transform,
130-
term->Bind(*schema_, case_sensitive_));
131-
int32_t source_id = bound_transform->reference()->field().field_id();
132-
return AddFieldInternal(part_name.empty() ? nullptr : &part_name, source_id,
133-
bound_transform->transform());
131+
ICEBERG_BUILDER_CHECK(false, "Cannot add {} term to partition spec", term->ToString());
132+
std::unreachable();
134133
}
135134

136135
UpdatePartitionSpec& UpdatePartitionSpec::AddFieldInternal(
137-
const std::string* name, int32_t source_id, std::shared_ptr<Transform> transform) {
138-
// Cache transform string to avoid repeated ToString() calls
139-
const std::string transform_str = transform->ToString();
140-
TransformKey validation_key{source_id, transform_str};
141-
136+
const std::string* name, int32_t source_id,
137+
const std::shared_ptr<Transform>& transform) {
142138
// Check for duplicate name in added fields
143139
if (name != nullptr) {
144140
ICEBERG_BUILDER_CHECK(!added_field_names_.contains(*name),
145141
"Cannot add duplicate partition field: {}", *name);
146142
}
147143

144+
// Cache transform string to avoid repeated ToString() calls
145+
const std::string transform_str = transform->ToString();
146+
TransformKey validation_key{source_id, transform_str};
147+
148148
// Check if this field already exists in the current spec
149149
auto existing_it = transform_to_field_.find(validation_key);
150150
if (existing_it != transform_to_field_.end()) {
@@ -157,18 +157,18 @@ UpdatePartitionSpec& UpdatePartitionSpec::AddFieldInternal(
157157

158158
ICEBERG_BUILDER_CHECK(
159159
is_deleted,
160-
"Cannot add duplicate partition field {} for source {} with transform {}, "
160+
"Cannot add duplicate partition field '{}' for source {} with transform {}, "
161161
"conflicts with {}",
162-
name ? *name : "unknown", source_id, transform_str, existing->ToString());
162+
name ? *name : "", source_id, transform_str, existing->ToString());
163163
}
164164

165165
// Check if already being added
166166
auto added_it = transform_to_added_field_.find(validation_key);
167167
ICEBERG_BUILDER_CHECK(
168168
added_it == transform_to_added_field_.end(),
169-
"Cannot add duplicate partition field {} for source {} with transform {}, "
169+
"Cannot add duplicate partition field '{}' for source {} with transform {}, "
170170
"already added: {}",
171-
name ? *name : "unknown", source_id, transform_str, added_it->second);
171+
name ? *name : "", source_id, transform_str, added_it->second);
172172

173173
// Create or recycle the partition field
174174
PartitionField new_field = RecycleOrCreatePartitionField(source_id, transform, name);
@@ -244,27 +244,32 @@ UpdatePartitionSpec& UpdatePartitionSpec::RemoveField(const std::string& name) {
244244
return *this;
245245
}
246246

247-
UpdatePartitionSpec& UpdatePartitionSpec::RemoveField(
248-
std::shared_ptr<NamedReference> term) {
249-
// Bind the term to get the source field
250-
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_ref, term->Bind(*schema_, case_sensitive_));
251-
int32_t source_id = bound_ref->field().field_id();
252-
253-
// Reference terms use identity transform
254-
TransformKey key{source_id, Transform::Identity()->ToString()};
255-
return RemoveFieldByTransform(key, term->ToString());
256-
}
257-
258-
UpdatePartitionSpec& UpdatePartitionSpec::RemoveField(
259-
std::shared_ptr<UnboundTransform> term) {
260-
// Bind the term to get the source field and transform
261-
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_transform,
262-
term->Bind(*schema_, case_sensitive_));
263-
int32_t source_id = bound_transform->reference()->field().field_id();
264-
auto transform = bound_transform->transform();
247+
UpdatePartitionSpec& UpdatePartitionSpec::RemoveField(const std::shared_ptr<Term>& term) {
248+
ICEBERG_BUILDER_CHECK(term->is_unbound(),
249+
"Cannot remove bound term from partition spec");
250+
// Bind the term to get the source id
251+
if (term->kind() == Term::Kind::kReference) {
252+
const auto& ref = std::dynamic_pointer_cast<NamedReference>(term);
253+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_ref,
254+
ref->Bind(*schema_, case_sensitive_));
255+
int32_t source_id = bound_ref->field().field_id();
256+
// Reference terms use identity transform
257+
TransformKey key{source_id, Transform::Identity()->ToString()};
258+
return RemoveFieldByTransform(key, term->ToString());
259+
} else if (term->kind() == Term::Kind::kTransform) {
260+
const auto& unbound_transform = std::dynamic_pointer_cast<UnboundTransform>(term);
261+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_transform,
262+
unbound_transform->Bind(*schema_, case_sensitive_));
263+
int32_t source_id = bound_transform->reference()->field().field_id();
264+
auto transform = bound_transform->transform();
265+
266+
TransformKey key{source_id, transform->ToString()};
267+
return RemoveFieldByTransform(key, term->ToString());
268+
}
265269

266-
TransformKey key{source_id, transform->ToString()};
267-
return RemoveFieldByTransform(key, term->ToString());
270+
ICEBERG_BUILDER_CHECK(false, "Cannot remove {} term from partition spec",
271+
term->ToString());
272+
std::unreachable();
268273
}
269274

270275
UpdatePartitionSpec& UpdatePartitionSpec::RemoveFieldByTransform(
@@ -461,25 +466,20 @@ UpdatePartitionSpec::IndexSpecByTransform(const PartitionSpec& spec) {
461466
}
462467

463468
void UpdatePartitionSpec::BuildHistoricalFieldsIndex() {
464-
const TableMetadata* base_metadata = transaction_->base();
465-
if (base_metadata == nullptr) {
466-
return;
467-
}
469+
const TableMetadata& base_metadata = transaction_->current();
468470

469471
// Count total fields across all specs to reserve capacity
470472
size_t total_fields = 0;
471-
for (const auto& partition_spec : base_metadata->partition_specs) {
473+
for (const auto& partition_spec : base_metadata.partition_specs) {
472474
total_fields += partition_spec->fields().size();
473475
}
474476
historical_fields_.reserve(total_fields);
475477

476478
// Index all fields from all historical partition specs
477479
// Later specs override earlier ones for the same (source_id, transform) key
478-
for (const auto& partition_spec : base_metadata->partition_specs) {
480+
for (const auto& partition_spec : base_metadata.partition_specs) {
479481
for (const auto& field : partition_spec->fields()) {
480482
TransformKey key{field.source_id(), field.transform()->ToString()};
481-
// Use emplace to only insert if key doesn't exist, preserving first occurrence
482-
// This ensures we get the earliest field ID for recycling
483483
historical_fields_.emplace(key, field);
484484
}
485485
}

src/iceberg/update/update_partition_spec.h

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
#include <unordered_set>
2929
#include <vector>
3030

31+
#include "iceberg/expression/term.h"
3132
#include "iceberg/iceberg_export.h"
3233
#include "iceberg/result.h"
33-
#include "iceberg/table_identifier.h"
3434
#include "iceberg/type_fwd.h"
3535
#include "iceberg/update/pending_update.h"
3636

@@ -61,43 +61,27 @@ class ICEBERG_EXPORT UpdatePartitionSpec : public PendingUpdate {
6161

6262
/// \brief Add a new partition field with a custom name.
6363
///
64-
/// \param term The named reference representing the source column.
64+
/// \param term The term representing the source column, should be unbound.
6565
/// \param part_name Name for the partition field.
6666
/// \return Reference to this for method chaining.
67-
UpdatePartitionSpec& AddField(std::shared_ptr<NamedReference> term,
68-
std::string part_name = "");
69-
70-
/// \brief Add a new partition field with a custom name from an unbound transform.
71-
///
72-
/// \param term The unbound transform representing the partition transform.
73-
/// \param part_name Name for the partition field.
74-
/// \return Reference to this for method chaining.
75-
UpdatePartitionSpec& AddField(std::shared_ptr<UnboundTransform> term,
76-
std::string part_name = "");
67+
UpdatePartitionSpec& AddField(const std::shared_ptr<Term>& term,
68+
const std::string& part_name = "");
7769

7870
/// \brief Remove a partition field by name.
7971
///
8072
/// \param name Name of the partition field to remove.
8173
/// \return Reference to this for method chaining.
8274
UpdatePartitionSpec& RemoveField(const std::string& name);
8375

84-
/// \brief Remove a partition field by its transform term.
76+
/// \brief Remove a partition field by its source term.
8577
///
8678
/// The partition field with the same transform and source reference will be removed.
8779
/// If the term is a reference and does not have a transform, the identity transform
8880
/// is used.
8981
///
90-
/// \param term The named reference representing the partition transform to remove.
91-
/// \return Reference to this for method chaining.
92-
UpdatePartitionSpec& RemoveField(std::shared_ptr<NamedReference> term);
93-
94-
/// \brief Remove a partition field by its transform term.
95-
///
96-
/// The partition field with the same transform and source reference will be removed.
97-
///
98-
/// \param term The unbound transform representing the partition transform to remove.
82+
/// \param term The term representing the source column, should be unbound.
9983
/// \return Reference to this for method chaining.
100-
UpdatePartitionSpec& RemoveField(std::shared_ptr<UnboundTransform> term);
84+
UpdatePartitionSpec& RemoveField(const std::shared_ptr<Term>& term);
10185

10286
/// \brief Rename a field in the partition spec.
10387
///
@@ -133,17 +117,27 @@ class ICEBERG_EXPORT UpdatePartitionSpec : public PendingUpdate {
133117
/// \brief Assign a new partition field ID.
134118
int32_t AssignFieldId();
135119

136-
/// \brief Recycle or create a partition field.
137120
///
138121
/// In V2, searches for a similar partition field in historical specs.
139122
/// If not found or in V1, creates a new PartitionField.
123+
124+
/// \brief Recycle or create a partition field.
125+
///
126+
/// In V2 it searches for a similar partition field in historical partition specs. Tries
127+
/// to match on source field ID, transform type and target name (optional). If not found
128+
/// or in V1 cases it creates a new PartitionField.
129+
///
130+
/// \param source_id The source field ID.
131+
/// \param transform The transform function.
132+
/// \param name The target partition field name, if specified.
133+
/// \return The recycled or newly created partition field.
140134
PartitionField RecycleOrCreatePartitionField(int32_t source_id,
141135
std::shared_ptr<Transform> transform,
142136
const std::string* name);
143137

144138
/// \brief Internal implementation of AddField with resolved source ID and transform.
145139
UpdatePartitionSpec& AddFieldInternal(const std::string* name, int32_t source_id,
146-
std::shared_ptr<Transform> transform);
140+
const std::shared_ptr<Transform>& transform);
147141

148142
/// \brief Generate a partition field name from the source and transform.
149143
std::string GeneratePartitionName(int32_t source_id,

0 commit comments

Comments
 (0)