Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ set(ICEBERG_SOURCES
manifest/manifest_group.cc
manifest/manifest_list.cc
manifest/manifest_reader.cc
manifest/manifest_util_internal.cc
Comment thread
zhjwpku marked this conversation as resolved.
Outdated
manifest/manifest_writer.cc
manifest/rolling_manifest_writer.cc
manifest/v1_metadata.cc
Expand Down Expand Up @@ -85,6 +86,7 @@ set(ICEBERG_SOURCES
transform_function.cc
type.cc
update/expire_snapshots.cc
update/fast_append.cc
update/pending_update.cc
update/snapshot_update.cc
update/update_location.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace iceberg {

constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
constexpr int64_t kInvalidSnapshotId = -1;
constexpr int64_t kInvalidSequenceNumber = -1;
/// \brief Stand-in for the current sequence number that will be assigned when the commit
/// is successful. This is replaced when writing a manifest list by the ManifestFile
/// adapter.
Expand Down
75 changes: 75 additions & 0 deletions src/iceberg/manifest/manifest_util_internal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/manifest/manifest_util_internal.h"

#include <memory>
#include <optional>

#include "iceberg/inheritable_metadata.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/manifest/manifest_writer.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/util/macros.h"

namespace iceberg {

Result<ManifestFile> CopyAppendManifest(
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec,
int64_t snapshot_id, const std::string& output_path, int8_t format_version,
SnapshotSummaryBuilder* summary_builder) {
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ManifestReader::Make(manifest, file_io, schema, spec));
Comment thread
wgtmac marked this conversation as resolved.
Outdated
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());

// use metadata that will add the current snapshot's ID for the rewrite
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata,
InheritableMetadataFactory::ForCopy(snapshot_id));

// do not produce row IDs for the copy
ICEBERG_ASSIGN_OR_RAISE(
auto writer,
ManifestWriter::MakeWriter(format_version, snapshot_id, output_path, file_io, spec,
schema, ManifestContent::kData));

// Write all entries as added entries with the new snapshot ID
for (auto& entry : entries) {
ICEBERG_PRECHECK(entry.status == ManifestStatus::kAdded,
"Manifest to copy must only contain added entries");

ICEBERG_RETURN_UNEXPECTED(inheritable_metadata->Apply(entry));

if (summary_builder != nullptr && entry.data_file != nullptr) {
ICEBERG_RETURN_UNEXPECTED(summary_builder->AddedFile(*spec, *entry.data_file));
}

ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
}

ICEBERG_RETURN_UNEXPECTED(writer->Close());
ICEBERG_ASSIGN_OR_RAISE(auto new_manifest, writer->ToManifestFile());

return new_manifest;
Comment thread
zhjwpku marked this conversation as resolved.
Outdated
}

} // namespace iceberg
56 changes: 56 additions & 0 deletions src/iceberg/manifest/manifest_util_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/manifest/manifest_util_internal.h
/// Internal utility functions for manifest operations.

#include <cstdint>
#include <memory>
#include <string>

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief Copy an append manifest with a new snapshot ID.
///
/// This function copies a manifest file that contains only ADDED entries,
/// rewriting it with a new snapshot ID. This is similar to Java's
/// ManifestFiles.copyAppendManifest.
///
/// \param manifest The manifest file to copy
/// \param file_io File IO implementation to use
/// \param schema Table schema
/// \param spec Partition spec for the manifest
/// \param snapshot_id The new snapshot ID to assign to entries
/// \param output_path Path where the new manifest will be written
/// \param format_version Table format version
/// \param summary_builder Optional summary builder to update with file metrics
/// \return The copied manifest file, or an error
ICEBERG_EXPORT Result<ManifestFile> CopyAppendManifest(
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec,
Comment thread
zhjwpku marked this conversation as resolved.
Outdated
int64_t snapshot_id, const std::string& output_path, int8_t format_version,
SnapshotSummaryBuilder* summary_builder = nullptr);

} // namespace iceberg
2 changes: 2 additions & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ iceberg_sources = files(
'manifest/manifest_group.cc',
'manifest/manifest_list.cc',
'manifest/manifest_reader.cc',
'manifest/manifest_util_internal.cc',
'manifest/manifest_writer.cc',
'manifest/rolling_manifest_writer.cc',
'manifest/v1_metadata.cc',
Expand Down Expand Up @@ -103,6 +104,7 @@ iceberg_sources = files(
'transform_function.cc',
'type.cc',
'update/expire_snapshots.cc',
'update/fast_append.cc',
'update/pending_update.cc',
'update/snapshot_update.cc',
'update/update_location.cc',
Expand Down
7 changes: 7 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ Result<std::shared_ptr<UpdateLocation>> Table::NewUpdateLocation() {
return transaction->NewUpdateLocation();
}

Result<std::shared_ptr<FastAppend>> Table::NewFastAppend() {
ICEBERG_ASSIGN_OR_RAISE(
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
/*auto_commit=*/true));
return transaction->NewFastAppend();
}

Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();

/// \brief Create a new FastAppend to append data files and commit the changes.
virtual Result<std::shared_ptr<FastAppend>> NewFastAppend();

protected:
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
1 change: 0 additions & 1 deletion src/iceberg/table_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ struct ICEBERG_EXPORT TableMetadata {
static constexpr int8_t kMinFormatVersionRowLineage = 3;
static constexpr int8_t kMinFormatVersionDefaultValues = 3;
static constexpr int64_t kInitialSequenceNumber = 0;
static constexpr int64_t kInvalidSequenceNumber = -1;
static constexpr int64_t kInitialRowId = 0;

static inline const std::unordered_map<TypeId, int8_t> kMinFormatVersions = {};
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE)
USE_BUNDLE
SOURCES
expire_snapshots_test.cc
fast_append_test.cc
transaction_test.cc
update_location_test.cc
update_partition_spec_test.cc
Expand Down
Loading
Loading