Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ set(ICEBERG_SOURCES
manifest/manifest_group.cc
manifest/manifest_list.cc
manifest/manifest_reader.cc
manifest/manifest_util.cc
manifest/manifest_writer.cc
manifest/rolling_manifest_writer.cc
manifest/v1_metadata.cc
Expand Down Expand Up @@ -85,6 +86,7 @@ set(ICEBERG_SOURCES
transform_function.cc
type.cc
update/expire_snapshots.cc
update/fast_append.cc
update/pending_update.cc
update/snapshot_update.cc
update/update_location.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace iceberg {

constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
constexpr int64_t kInvalidSnapshotId = -1;
constexpr int64_t kInvalidSequenceNumber = -1;
/// \brief Stand-in for the current sequence number that will be assigned when the commit
/// is successful. This is replaced when writing a manifest list by the ManifestFile
/// adapter.
Expand Down
18 changes: 18 additions & 0 deletions src/iceberg/manifest/manifest_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "iceberg/expression/expression.h"
#include "iceberg/expression/projections.h"
#include "iceberg/file_format.h"
#include "iceberg/inheritable_metadata.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/manifest/manifest_reader_internal.h"
Expand Down Expand Up @@ -1012,6 +1013,23 @@ Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
std::move(spec), std::move(inheritable_metadata), std::nullopt);
}

Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec,
std::unique_ptr<InheritableMetadata> inheritable_metadata,
std::optional<int64_t> first_row_id) {
if (file_io == nullptr || schema == nullptr || spec == nullptr ||
inheritable_metadata == nullptr) {
return InvalidArgument(
"FileIO, Schema, PartitionSpec, and InheritableMetadata cannot be null to create "
"ManifestReader");
}

return std::make_unique<ManifestReaderImpl>(
manifest.manifest_path, manifest.manifest_length, std::move(file_io),
std::move(schema), std::move(spec), std::move(inheritable_metadata), first_row_id);
}

Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
std::shared_ptr<Schema> schema = ManifestFile::Type()->ToSchema();
Expand Down
18 changes: 18 additions & 0 deletions src/iceberg/manifest/manifest_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
/// \file iceberg/manifest/manifest_reader.h
/// Data reader interface for manifest files.

#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <vector>
Expand Down Expand Up @@ -100,6 +102,22 @@ class ICEBERG_EXPORT ManifestReader {
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec);

/// \brief Creates a reader for a manifest file with explicit inheritable metadata.
/// \param manifest A ManifestFile object containing metadata about the manifest.
/// \param file_io File IO implementation to use.
/// \param schema Schema used to bind the partition type.
/// \param spec Partition spec used for this manifest file.
/// \param inheritable_metadata Inheritable metadata to use (instead of extracting from
/// manifest).
/// \param first_row_id First row ID to use (nullopt to clear first_row_id from
/// entries).
/// \return A Result containing the reader or an error.
static Result<std::unique_ptr<ManifestReader>> Make(
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec,
std::unique_ptr<InheritableMetadata> inheritable_metadata,
std::optional<int64_t> first_row_id);

/// \brief Add stats columns to the column list if needed.
static std::vector<std::string> WithStatsColumns(
const std::vector<std::string>& columns);
Expand Down
69 changes: 69 additions & 0 deletions src/iceberg/manifest/manifest_util.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <memory>
#include <optional>

#include "iceberg/inheritable_metadata.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/manifest/manifest_util_internal.h"
#include "iceberg/manifest/manifest_writer.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/util/macros.h"

namespace iceberg {

Result<ManifestFile> CopyAppendManifest(
const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io,
const std::shared_ptr<Schema>& schema, const std::shared_ptr<PartitionSpec>& spec,
int64_t snapshot_id, const std::string& output_path, int8_t format_version,
SnapshotSummaryBuilder* summary_builder) {
// use metadata that will add the current snapshot's ID for the rewrite
// read first_row_id as null because this copies the incoming manifest before commit
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata,
InheritableMetadataFactory::ForCopy(snapshot_id));
ICEBERG_ASSIGN_OR_RAISE(
auto reader, ManifestReader::Make(manifest, file_io, schema, spec,
std::move(inheritable_metadata), std::nullopt));
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());

// do not produce row IDs for the copy
ICEBERG_ASSIGN_OR_RAISE(
auto writer, ManifestWriter::MakeWriter(
format_version, snapshot_id, output_path, file_io, spec, schema,
ManifestContent::kData, /*first_row_id*/ std::nullopt));

for (auto& entry : entries) {
ICEBERG_CHECK(entry.status == ManifestStatus::kAdded,
"Manifest to copy must only contain added entries");
if (summary_builder != nullptr && entry.data_file != nullptr) {
ICEBERG_RETURN_UNEXPECTED(summary_builder->AddedFile(*spec, *entry.data_file));
}

ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
}

ICEBERG_RETURN_UNEXPECTED(writer->Close());
return writer->ToManifestFile();
}

} // namespace iceberg
56 changes: 56 additions & 0 deletions src/iceberg/manifest/manifest_util_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/manifest/manifest_util_internal.h
/// Internal utility functions for manifest operations.

#include <cstdint>
#include <memory>
#include <string>

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief Copy an append manifest with a new snapshot ID.
///
/// This function copies a manifest file that contains only ADDED entries,
/// rewriting it with a new snapshot ID. This is similar to Java's
/// ManifestFiles.copyAppendManifest.
///
/// \param manifest The manifest file to copy
/// \param file_io File IO implementation to use
/// \param schema Table schema
/// \param spec Partition spec for the manifest
/// \param snapshot_id The new snapshot ID to assign to entries
/// \param output_path Path where the new manifest will be written
/// \param format_version Table format version
/// \param summary_builder Optional summary builder to update with file metrics
/// \return The copied manifest file, or an error
ICEBERG_EXPORT Result<ManifestFile> CopyAppendManifest(
const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io,
const std::shared_ptr<Schema>& schema, const std::shared_ptr<PartitionSpec>& spec,
int64_t snapshot_id, const std::string& output_path, int8_t format_version,
SnapshotSummaryBuilder* summary_builder = nullptr);

} // namespace iceberg
2 changes: 2 additions & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ iceberg_sources = files(
'manifest/manifest_group.cc',
'manifest/manifest_list.cc',
'manifest/manifest_reader.cc',
'manifest/manifest_util.cc',
'manifest/manifest_writer.cc',
'manifest/rolling_manifest_writer.cc',
'manifest/v1_metadata.cc',
Expand Down Expand Up @@ -103,6 +104,7 @@ iceberg_sources = files(
'transform_function.cc',
'type.cc',
'update/expire_snapshots.cc',
'update/fast_append.cc',
'update/pending_update.cc',
'update/snapshot_update.cc',
'update/update_location.cc',
Expand Down
7 changes: 7 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ Result<std::shared_ptr<UpdateLocation>> Table::NewUpdateLocation() {
return transaction->NewUpdateLocation();
}

Result<std::shared_ptr<FastAppend>> Table::NewFastAppend() {
ICEBERG_ASSIGN_OR_RAISE(
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
/*auto_commit=*/true));
return transaction->NewFastAppend();
}

Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();

/// \brief Create a new FastAppend to append data files and commit the changes.
virtual Result<std::shared_ptr<FastAppend>> NewFastAppend();

protected:
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
1 change: 0 additions & 1 deletion src/iceberg/table_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ struct ICEBERG_EXPORT TableMetadata {
static constexpr int8_t kMinFormatVersionRowLineage = 3;
static constexpr int8_t kMinFormatVersionDefaultValues = 3;
static constexpr int64_t kInitialSequenceNumber = 0;
static constexpr int64_t kInvalidSequenceNumber = -1;
static constexpr int64_t kInitialRowId = 0;

static inline const std::unordered_map<TypeId, int8_t> kMinFormatVersions = {};
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE)
USE_BUNDLE
SOURCES
expire_snapshots_test.cc
fast_append_test.cc
transaction_test.cc
update_location_test.cc
update_partition_spec_test.cc
Expand Down
Loading
Loading