Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/iceberg/catalog/rest/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,9 @@ Result<CreateTableRequest> CreateTableRequestFromJson(const nlohmann::json& json
if (json.contains(kPartitionSpec)) {
ICEBERG_ASSIGN_OR_RAISE(auto partition_spec,
GetJsonValue<nlohmann::json>(json, kPartitionSpec));
ICEBERG_ASSIGN_OR_RAISE(request.partition_spec,
PartitionSpecFromJson(request.schema, partition_spec,
PartitionSpec::kInitialSpecId));
ICEBERG_ASSIGN_OR_RAISE(
request.partition_spec,
PartitionSpecFromJson(request.schema, partition_spec, kInitialSpecId));
}
if (json.contains(kWriteOrder)) {
ICEBERG_ASSIGN_OR_RAISE(auto sort_order,
Expand Down
15 changes: 14 additions & 1 deletion src/iceberg/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ constexpr int64_t kInvalidSequenceNumber = -1;
/// adapter.
constexpr int64_t kUnassignedSequenceNumber = -1;

// TODO(gangwu): move other commonly used constants here.
constexpr int32_t kInitialSchemaId = 0;
constexpr int32_t kInitialColumnId = 0;
constexpr int32_t kInvalidColumnId = -1;

constexpr int32_t kInvalidFieldId = -1;

constexpr int32_t kInitialSpecId = 0;
/// \brief The start ID for partition field. It is only used to generate
/// partition field id for v1 metadata where it is tracked.
constexpr int32_t kLegacyPartitionDataIdStart = 1000;
constexpr int32_t kInvalidPartitionFieldId = -1;

constexpr int32_t kUnsortedOrderId = 0;
constexpr int32_t kInitialSortOrderId = 1;

} // namespace iceberg
15 changes: 7 additions & 8 deletions src/iceberg/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ Result<std::unique_ptr<Schema>> SchemaFromJson(const nlohmann::json& json) {
auto identifier_field_ids,
GetJsonValueOrDefault<std::vector<int32_t>>(json, kIdentifierFieldIds));

return Schema::Make(std::move(fields), schema_id_opt.value_or(Schema::kInitialSchemaId),
return Schema::Make(std::move(fields), schema_id_opt.value_or(kInitialSchemaId),
std::move(identifier_field_ids));
}

Expand Down Expand Up @@ -599,8 +599,8 @@ Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
int32_t field_id;
if (allow_field_id_missing) {
// Partition field id in v1 is not tracked, so we use -1 to indicate that.
ICEBERG_ASSIGN_OR_RAISE(field_id, GetJsonValueOrDefault<int32_t>(
json, kFieldId, SchemaField::kInvalidFieldId));
ICEBERG_ASSIGN_OR_RAISE(
field_id, GetJsonValueOrDefault<int32_t>(json, kFieldId, kInvalidFieldId));
} else {
ICEBERG_ASSIGN_OR_RAISE(field_id, GetJsonValue<int32_t>(json, kFieldId));
}
Expand Down Expand Up @@ -1001,14 +1001,14 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
SafeDumpJson(partition_spec_json));
}

int32_t next_partition_field_id = PartitionSpec::kLegacyPartitionDataIdStart;
int32_t next_partition_field_id = kLegacyPartitionDataIdStart;
std::vector<PartitionField> fields;
for (const auto& entry_json : partition_spec_json) {
ICEBERG_ASSIGN_OR_RAISE(
auto field, PartitionFieldFromJson(
entry_json, /*allow_field_id_missing=*/format_version == 1));
int32_t field_id = field->field_id();
if (field_id == SchemaField::kInvalidFieldId) {
if (field_id == kInvalidFieldId) {
// If the field ID is not set, we need to assign a new one
field_id = next_partition_field_id++;
}
Expand All @@ -1018,9 +1018,8 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,

// Create partition spec with schema validation
ICEBERG_ASSIGN_OR_RAISE(
auto spec,
PartitionSpec::Make(*current_schema, PartitionSpec::kInitialSpecId,
std::move(fields), /*allow_missing_fields=*/false));
auto spec, PartitionSpec::Make(*current_schema, kInitialSpecId, std::move(fields),
/*allow_missing_fields=*/false));
default_spec_id = spec->spec_id();
partition_specs.push_back(std::move(spec));
}
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/manifest/manifest_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ struct ICEBERG_EXPORT ManifestFile {
/// Field id: 502
/// ID of a partition spec used to write the manifest; must be listed in table metadata
/// partition-specs
int32_t partition_spec_id = PartitionSpec::kInitialSpecId;
int32_t partition_spec_id = kInitialSpecId;
/// Field id: 517
/// The type of files tracked by the manifest, either data or delete files; 0 for all v1
/// manifests
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/partition_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpec::Make(

bool PartitionSpec::HasSequentialFieldIds(const PartitionSpec& spec) {
for (size_t i = 0; i < spec.fields().size(); i += 1) {
if (spec.fields()[i].field_id() != PartitionSpec::kLegacyPartitionDataIdStart + i) {
if (spec.fields()[i].field_id() != kLegacyPartitionDataIdStart + i) {
return false;
}
}
Expand Down
7 changes: 1 addition & 6 deletions src/iceberg/partition_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <unordered_map>
#include <vector>

#include "iceberg/constants.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/partition_field.h"
#include "iceberg/result.h"
Expand All @@ -46,12 +47,6 @@ namespace iceberg {
/// evolution.
class ICEBERG_EXPORT PartitionSpec : public util::Formattable {
public:
static constexpr int32_t kInitialSpecId = 0;
/// \brief The start ID for partition field. It is only used to generate
/// partition field id for v1 metadata where it is tracked.
static constexpr int32_t kLegacyPartitionDataIdStart = 1000;
static constexpr int32_t kInvalidPartitionFieldId = -1;

/// \brief Get an unsorted partition spec singleton.
static const std::shared_ptr<PartitionSpec>& Unpartitioned();

Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ Result<int32_t> SchemaCache::InitHighestFieldId(const Schema* schema) {
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, InitIdToFieldMap(schema));

if (id_to_field.empty()) {
return Schema::kInitialColumnId;
return kInitialColumnId;
}

auto max_it = std::ranges::max_element(
Expand Down
5 changes: 1 addition & 4 deletions src/iceberg/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <unordered_set>
#include <vector>

#include "iceberg/constants.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/schema_field.h"
Expand All @@ -48,10 +49,6 @@ class SchemaCache;
/// evolution.
class ICEBERG_EXPORT Schema : public StructType {
public:
static constexpr int32_t kInitialSchemaId = 0;
static constexpr int32_t kInitialColumnId = 0;
static constexpr int32_t kInvalidColumnId = -1;

/// \brief Special value to select all columns from manifest files.
static constexpr std::string_view kAllColumns = "*";

Expand Down
3 changes: 1 addition & 2 deletions src/iceberg/schema_field.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <string>
#include <string_view>

#include "iceberg/constants.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
Expand All @@ -38,8 +39,6 @@ namespace iceberg {
/// \brief A type combined with a name.
class ICEBERG_EXPORT SchemaField : public iceberg::util::Formattable {
public:
static constexpr int32_t kInvalidFieldId = -1;

/// \brief Construct a field.
/// \param[in] field_id The field ID.
/// \param[in] name The field name.
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/schema_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ std::unique_ptr<Schema> FromStructType(StructType&& struct_type,
for (auto& field : struct_type.fields()) {
fields.emplace_back(std::move(field));
}
auto schema_id = schema_id_opt.value_or(Schema::kInitialSchemaId);
auto schema_id = schema_id_opt.value_or(kInitialSchemaId);
return std::make_unique<Schema>(std::move(fields), schema_id);
}

Expand Down
4 changes: 1 addition & 3 deletions src/iceberg/sort_order.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <unordered_set>
#include <vector>

#include "iceberg/constants.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/sort_field.h"
#include "iceberg/type_fwd.h"
Expand All @@ -39,9 +40,6 @@ namespace iceberg {
/// applied to the data.
class ICEBERG_EXPORT SortOrder : public util::Formattable {
public:
static constexpr int32_t kUnsortedOrderId = 0;
static constexpr int32_t kInitialSortOrderId = 1;

/// \brief Get an unsorted sort order singleton.
static const std::shared_ptr<SortOrder>& Unsorted();

Expand Down
25 changes: 12 additions & 13 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Result<std::unique_ptr<PartitionSpec>> FreshPartitionSpec(int32_t spec_id,
const Schema& fresh_schema) {
std::vector<PartitionField> partition_fields;
partition_fields.reserve(spec.fields().size());
int32_t last_partition_field_id = PartitionSpec::kInvalidPartitionFieldId;
int32_t last_partition_field_id = kInvalidPartitionFieldId;
for (auto& field : spec.fields()) {
ICEBERG_ASSIGN_OR_RAISE(auto source_name,
base_schema.FindColumnNameById(field.source_id()));
Expand Down Expand Up @@ -145,7 +145,7 @@ std::vector<std::unique_ptr<TableUpdate>> ChangesForCreate(
if (auto partition_spec_result = metadata.PartitionSpec();
partition_spec_result.has_value()) {
auto spec = partition_spec_result.value();
if (spec && spec->spec_id() != PartitionSpec::kInitialSpecId) {
if (spec && spec->spec_id() != kInitialSpecId) {
changes.push_back(std::make_unique<table::AddPartitionSpec>(spec));
} else {
changes.push_back(
Expand Down Expand Up @@ -205,17 +205,16 @@ Result<std::unique_ptr<TableMetadata>> TableMetadata::Make(
int32_t last_column_id = 0;
auto next_id = [&last_column_id]() -> int32_t { return ++last_column_id; };
ICEBERG_ASSIGN_OR_RAISE(auto fresh_schema,
AssignFreshIds(Schema::kInitialSchemaId, schema, next_id));
AssignFreshIds(kInitialSchemaId, schema, next_id));

// Rebuild the partition spec using the new column ids
ICEBERG_ASSIGN_OR_RAISE(
auto fresh_spec,
FreshPartitionSpec(PartitionSpec::kInitialSpecId, spec, schema, *fresh_schema));
auto fresh_spec, FreshPartitionSpec(kInitialSpecId, spec, schema, *fresh_schema));

// rebuild the sort order using the new column ids
ICEBERG_ASSIGN_OR_RAISE(
auto fresh_order,
FreshSortOrder(SortOrder::kInitialSortOrderId, sort_order, schema, *fresh_schema))
FreshSortOrder(kInitialSortOrderId, sort_order, schema, *fresh_schema))

// Validata the metrics configuration.
ICEBERG_RETURN_UNEXPECTED(
Expand Down Expand Up @@ -549,11 +548,11 @@ class TableMetadataBuilder::Impl {
metadata_.format_version = format_version;
metadata_.last_sequence_number = TableMetadata::kInitialSequenceNumber;
metadata_.last_updated_ms = kInvalidLastUpdatedMs;
metadata_.last_column_id = Schema::kInvalidColumnId;
metadata_.default_spec_id = PartitionSpec::kInitialSpecId;
metadata_.last_partition_id = PartitionSpec::kInvalidPartitionFieldId;
metadata_.last_column_id = kInvalidColumnId;
metadata_.default_spec_id = kInitialSpecId;
metadata_.last_partition_id = kInvalidPartitionFieldId;
metadata_.current_snapshot_id = kInvalidSnapshotId;
metadata_.default_sort_order_id = SortOrder::kInitialSortOrderId;
metadata_.default_sort_order_id = kInitialSortOrderId;
metadata_.next_row_id = TableMetadata::kInitialRowId;
}

Expand Down Expand Up @@ -1367,10 +1366,10 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataBuilder::Impl::Build() {
int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSortOrderId(
const SortOrder& new_order) {
if (new_order.is_unsorted()) {
return SortOrder::kUnsortedOrderId;
return kUnsortedOrderId;
}
// determine the next order id
int32_t new_order_id = SortOrder::kInitialSortOrderId;
int32_t new_order_id = kInitialSortOrderId;
for (const auto& order : metadata_.sort_orders) {
if (order->SameOrder(new_order)) {
return order->order_id();
Expand All @@ -1384,7 +1383,7 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSortOrderId(
int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId(
const PartitionSpec& new_spec) {
// if the spec already exists, use the same ID. otherwise, use the highest ID + 1.
int32_t new_spec_id = PartitionSpec::kInitialSpecId;
int32_t new_spec_id = kInitialSpecId;
for (const auto& spec : metadata_.partition_specs) {
if (new_spec.CompatibleWith(*spec)) {
return spec->spec_id();
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ TableScanBuilder<ScanType>::ResolveSnapshotSchema() {
if (context_.snapshot_id.has_value()) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot,
metadata_->SnapshotById(*context_.snapshot_id));
int32_t schema_id = snapshot->schema_id.value_or(Schema::kInitialSchemaId);
int32_t schema_id = snapshot->schema_id.value_or(kInitialSchemaId);
ICEBERG_ASSIGN_OR_RAISE(snapshot_schema_, metadata_->SchemaById(schema_id));
} else {
ICEBERG_ASSIGN_OR_RAISE(snapshot_schema_, metadata_->Schema());
Expand Down
10 changes: 5 additions & 5 deletions src/iceberg/test/assign_id_visitor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Result<std::unique_ptr<Schema>> CreateNestedSchema(
SchemaField::MakeOptional(/*field_id=*/30, "map", CreateMapWithStructValue()),
SchemaField::MakeRequired(/*field_id=*/40, "struct", CreateNestedStruct()),
},
Schema::kInitialSchemaId, std::move(identifier_field_ids));
kInitialSchemaId, std::move(identifier_field_ids));
}

} // namespace
Expand All @@ -94,7 +94,7 @@ TEST(AssignFreshIdVisitorTest, FlatSchema) {
std::atomic<int32_t> id = 0;
auto next_id = [&id]() { return ++id; };
ICEBERG_UNWRAP_OR_FAIL(auto fresh_schema,
AssignFreshIds(Schema::kInitialSchemaId, schema, next_id));
AssignFreshIds(kInitialSchemaId, schema, next_id));

ASSERT_EQ(fresh_schema->fields().size(), schema.fields().size());
EXPECT_EQ(Schema(
Expand All @@ -104,7 +104,7 @@ TEST(AssignFreshIdVisitorTest, FlatSchema) {
SchemaField::MakeOptional(/*field_id=*/3, "age", iceberg::int32()),
SchemaField::MakeRequired(/*field_id=*/4, "data", iceberg::float64()),
},
Schema::kInitialSchemaId),
kInitialSchemaId),
*fresh_schema);
}

Expand All @@ -113,7 +113,7 @@ TEST(AssignFreshIdVisitorTest, NestedSchema) {
std::atomic<int32_t> id = 0;
auto next_id = [&id]() { return ++id; };
ICEBERG_UNWRAP_OR_FAIL(auto fresh_schema,
AssignFreshIds(Schema::kInitialSchemaId, *schema, next_id));
AssignFreshIds(kInitialSchemaId, *schema, next_id));

ASSERT_EQ(4, fresh_schema->fields().size());
for (int32_t i = 0; i < fresh_schema->fields().size(); ++i) {
Expand Down Expand Up @@ -175,7 +175,7 @@ TEST(AssignFreshIdVisitorTest, RefreshIdentifierId) {

ICEBERG_UNWRAP_OR_FAIL(auto schema, CreateNestedSchema({10, 301}));
ICEBERG_UNWRAP_OR_FAIL(auto fresh_schema,
AssignFreshIds(Schema::kInitialSchemaId, *schema, next_id));
AssignFreshIds(kInitialSchemaId, *schema, next_id));
EXPECT_THAT(fresh_schema->IdentifierFieldIds(), testing::ElementsAre(1, 12));
ICEBERG_UNWRAP_OR_FAIL(auto identifier_field_names,
fresh_schema->IdentifierFieldNames());
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/test/data_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ TEST_F(EqualityDeleteWriterTest, MetadataAfterClose) {

// Partition spec id must be set
ASSERT_TRUE(data_file->partition_spec_id.has_value());
EXPECT_EQ(data_file->partition_spec_id.value(), PartitionSpec::kInitialSpecId);
EXPECT_EQ(data_file->partition_spec_id.value(), kInitialSpecId);

// Equality field ids must be set
ASSERT_EQ(data_file->equality_ids.size(), 2);
Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/test/location_provider_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ TEST_F(LocationProviderTest, ObjectStorageWithPartition) {

ICEBERG_UNWRAP_OR_FAIL(
auto mock_spec,
PartitionSpec::Make(PartitionSpec::kInitialSpecId,
PartitionSpec::Make(kInitialSpecId,
{PartitionField(1, 1, "data#1", Transform::Identity())},
PartitionSpec::kInvalidPartitionFieldId + 1));
kInvalidPartitionFieldId + 1));
PartitionValues mock_partition_data({Literal::String("val#1")});
ICEBERG_UNWRAP_OR_FAIL(
auto location,
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/test/manifest_evaluator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ManifestEvaluatorTest : public ::testing::Test {

std::vector<PartitionField> BuildIdentityFields() {
std::vector<PartitionField> fields;
int32_t partition_field_id = PartitionSpec::kLegacyPartitionDataIdStart;
int32_t partition_field_id = kLegacyPartitionDataIdStart;
auto add_field = [&](int32_t source_id, std::string name) {
fields.emplace_back(source_id, partition_field_id++, std::move(name),
Transform::Identity());
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/test/metadata_serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ TEST(MetadataSerdeTest, DeserializeV1Valid) {
.last_updated_ms = TimePointMsFromUnixMs(1602638573874),
.last_column_id = 3,
.schemas = {expected_schema},
.current_schema_id = Schema::kInitialSchemaId,
.current_schema_id = kInitialSchemaId,
.partition_specs = {expected_spec},
.default_spec_id = 0,
.last_partition_id = 1000,
Expand Down
Loading
Loading