Skip to content

Commit 17ab0c3

Browse files
authored
feat: introduce scan.tag-name option to specify scanning from tag for reading given tag (#85)
1 parent d1afa9b commit 17ab0c3

24 files changed

Lines changed: 786 additions & 5 deletions

include/paimon/defs.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,8 @@ struct PAIMON_EXPORT Options {
287287
/// "global-index.external-path" - Global index root directory, if not set, the global index
288288
/// files will be stored under the index directory.
289289
static const char GLOBAL_INDEX_EXTERNAL_PATH[];
290+
/// "scan.tag-name" - Optional tag name used in case of "from-snapshot" scan mode.
291+
static const char SCAN_TAG_NAME[];
290292
};
291293

292294
static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits<int64_t>::max();

src/paimon/CMakeLists.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ set(PAIMON_CORE_SRCS
265265
core/table/source/table_read.cpp
266266
core/table/source/table_scan.cpp
267267
core/table/source/data_evolution_batch_scan.cpp
268+
core/tag/tag.cpp
268269
core/utils/field_mapping.cpp
269270
core/utils/fields_comparator.cpp
270271
core/utils/file_store_path_factory.cpp
@@ -273,7 +274,8 @@ set(PAIMON_CORE_SRCS
273274
core/utils/partition_path_utils.cpp
274275
core/utils/primary_key_table_utils.cpp
275276
core/utils/snapshot_manager.cpp
276-
core/utils/special_field_ids.cpp)
277+
core/utils/special_field_ids.cpp
278+
core/utils/tag_manager.cpp)
277279

278280
add_paimon_lib(paimon
279281
SOURCES
@@ -564,6 +566,7 @@ if(PAIMON_BUILD_TESTS)
564566
core/table/source/split_generator_test.cpp
565567
core/table/source/startup_mode_test.cpp
566568
core/table/source/table_scan_test.cpp
569+
core/tag/tag_test.cpp
567570
core/utils/branch_manager_test.cpp
568571
core/utils/field_mapping_test.cpp
569572
core/utils/fields_comparator_test.cpp
@@ -573,6 +576,7 @@ if(PAIMON_BUILD_TESTS)
573576
core/utils/offset_row_test.cpp
574577
core/utils/partition_path_utils_test.cpp
575578
core/utils/snapshot_manager_test.cpp
579+
core/utils/tag_manager_test.cpp
576580
core/utils/primary_key_table_utils_test.cpp
577581
core/utils/index_file_path_factories_test.cpp
578582
STATIC_LINK_LIBS

src/paimon/common/defs.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,5 @@ const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name";
8181
const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor";
8282
const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled";
8383
const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path";
84+
const char Options::SCAN_TAG_NAME[] = "scan.tag-name";
8485
} // namespace paimon

src/paimon/core/core_options.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,8 @@ struct CoreOptions::Impl {
304304
bool legacy_partition_name_enabled = true;
305305
bool global_index_enabled = true;
306306
std::optional<std::string> global_index_external_path;
307+
308+
std::optional<std::string> scan_tag_name;
307309
};
308310

309311
// Parse configurations from a map and return a populated CoreOptions object
@@ -476,6 +478,12 @@ Result<CoreOptions> CoreOptions::FromMap(
476478
if (!global_index_external_path.empty()) {
477479
impl->global_index_external_path = global_index_external_path;
478480
}
481+
// Parse scan.tag-name
482+
std::string scan_tag_name;
483+
PAIMON_RETURN_NOT_OK(parser.ParseString(Options::SCAN_TAG_NAME, &scan_tag_name));
484+
if (!scan_tag_name.empty()) {
485+
impl->scan_tag_name = scan_tag_name;
486+
}
479487

480488
return options;
481489
}
@@ -561,7 +569,7 @@ const std::string& CoreOptions::GetManifestCompression() const {
561569

562570
StartupMode CoreOptions::GetStartupMode() const {
563571
if (impl_->startup_mode == StartupMode::Default()) {
564-
if (GetScanSnapshotId() != std::nullopt) {
572+
if (GetScanSnapshotId() != std::nullopt || GetScanTagName() != std::nullopt) {
565573
return StartupMode::FromSnapshot();
566574
}
567575
return StartupMode::LatestFull();
@@ -772,4 +780,8 @@ Result<std::optional<std::string>> CoreOptions::CreateGlobalIndexExternalPath()
772780
return std::optional<std::string>(path.ToString());
773781
}
774782

783+
std::optional<std::string> CoreOptions::GetScanTagName() const {
784+
return impl_->scan_tag_name;
785+
}
786+
775787
} // namespace paimon

src/paimon/core/core_options.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ class PAIMON_EXPORT CoreOptions {
118118
bool GlobalIndexEnabled() const;
119119
Result<std::optional<std::string>> CreateGlobalIndexExternalPath() const;
120120

121+
std::optional<std::string> GetScanTagName() const;
122+
121123
const std::map<std::string, std::string>& ToMap() const;
122124

123125
private:

src/paimon/core/core_options_test.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ TEST(CoreOptionsTest, TestDefaultValue) {
8888
ASSERT_TRUE(core_options.LegacyPartitionNameEnabled());
8989
ASSERT_TRUE(core_options.GlobalIndexEnabled());
9090
ASSERT_FALSE(core_options.GetGlobalIndexExternalPath());
91+
ASSERT_EQ(std::nullopt, core_options.GetScanTagName());
9192
}
9293

9394
TEST(CoreOptionsTest, TestFromMap) {
@@ -146,6 +147,7 @@ TEST(CoreOptionsTest, TestFromMap) {
146147
{Options::PARTITION_GENERATE_LEGACY_NAME, "false"},
147148
{Options::GLOBAL_INDEX_ENABLED, "false"},
148149
{Options::GLOBAL_INDEX_EXTERNAL_PATH, "FILE:///tmp/global_index/"},
150+
{Options::SCAN_TAG_NAME, "test-tag"},
149151
};
150152

151153
ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options));
@@ -216,6 +218,8 @@ TEST(CoreOptionsTest, TestFromMap) {
216218
ASSERT_FALSE(core_options.GlobalIndexEnabled());
217219
ASSERT_TRUE(core_options.GetGlobalIndexExternalPath());
218220
ASSERT_EQ(core_options.GetGlobalIndexExternalPath().value(), "FILE:///tmp/global_index/");
221+
ASSERT_EQ("test-tag", core_options.GetScanTagName().value());
222+
ASSERT_EQ(StartupMode::FromSnapshot(), core_options.GetStartupMode());
219223
}
220224

221225
TEST(CoreOptionsTest, TestInvalidCase) {

src/paimon/core/table/source/abstract_table_scan.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "paimon/core/table/source/snapshot/full_starting_scanner.h"
2626
#include "paimon/core/table/source/snapshot/snapshot_reader.h"
2727
#include "paimon/core/table/source/snapshot/static_from_snapshot_starting_scanner.h"
28+
#include "paimon/core/table/source/snapshot/static_from_tag_starting_scanner.h"
2829
#include "paimon/table/source/startup_mode.h"
2930
#include "paimon/table/source/table_scan.h"
3031
namespace paimon {
@@ -51,16 +52,24 @@ class AbstractTableScan : public TableScan {
5152
return std::shared_ptr<StartingScanner>(new FullStartingScanner(snapshot_manager));
5253
}
5354
} else if (startup_mode == StartupMode::FromSnapshot()) {
55+
const std::optional<std::string> scan_tag_name = core_options_.GetScanTagName();
5456
if (specified_snapshot_id != std::nullopt) {
5557
return is_streaming
5658
? std::shared_ptr<StartingScanner>(
5759
new ContinuousFromSnapshotStartingScanner(
5860
snapshot_manager, specified_snapshot_id.value()))
5961
: std::shared_ptr<StartingScanner>(new StaticFromSnapshotStartingScanner(
6062
snapshot_manager, specified_snapshot_id.value()));
63+
} else if (scan_tag_name != std::nullopt) {
64+
if (is_streaming) {
65+
return Status::Invalid("Cannot scan from tag in streaming mode");
66+
}
67+
return std::make_shared<StaticFromTagStartingScanner>(snapshot_manager,
68+
scan_tag_name.value());
6169
} else {
6270
return Status::Invalid(
63-
"scan.snapshot-id must be set when startup mode is FROM_SNAPSHOT");
71+
"scan.snapshot-id or scan.tag-name must be set when startup mode is "
72+
"FROM_SNAPSHOT");
6473
}
6574
} else if (startup_mode == StartupMode::FromSnapshotFull()) {
6675
if (specified_snapshot_id != std::nullopt) {
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2026-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <memory>
20+
21+
#include "paimon/core/table/source/snapshot/starting_scanner.h"
22+
#include "paimon/core/utils/tag_manager.h"
23+
24+
namespace paimon {
25+
/// `StartingScanner` for the `CoreOptions::GetScanTagName()` of a batch read.
26+
class StaticFromTagStartingScanner : public StartingScanner {
27+
public:
28+
StaticFromTagStartingScanner(const std::shared_ptr<SnapshotManager>& snapshot_manager,
29+
const std::string& tag_name)
30+
: StartingScanner(snapshot_manager) {
31+
tag_name_ = tag_name;
32+
}
33+
34+
Result<std::shared_ptr<ScanResult>> Scan(
35+
const std::shared_ptr<SnapshotReader>& snapshot_reader) override {
36+
const TagManager tag_manager(snapshot_manager_->Fs(), snapshot_manager_->RootPath());
37+
PAIMON_ASSIGN_OR_RAISE(const Tag tag, tag_manager.GetOrThrow(tag_name_));
38+
PAIMON_ASSIGN_OR_RAISE(const Snapshot snapshot, tag.TrimToSnapshot());
39+
PAIMON_ASSIGN_OR_RAISE(
40+
std::shared_ptr<Plan> plan,
41+
snapshot_reader->WithMode(ScanMode::ALL)->WithSnapshot(snapshot)->Read());
42+
return std::make_shared<StartingScanner::CurrentSnapshot>(plan);
43+
}
44+
45+
private:
46+
std::string tag_name_;
47+
};
48+
} // namespace paimon

src/paimon/core/tag/tag.cpp

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2026-present Alibaba Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "paimon/core/tag/tag.h"
18+
19+
#include <cassert>
20+
21+
#include "paimon/common/utils/rapidjson_util.h"
22+
#include "paimon/fs/file_system.h"
23+
#include "paimon/result.h"
24+
#include "paimon/status.h"
25+
#include "rapidjson/document.h"
26+
#include "rapidjson/rapidjson.h"
27+
28+
namespace paimon {
29+
30+
Tag::Tag(const std::optional<int32_t>& version, const int64_t id, const int64_t schema_id,
31+
const std::string& base_manifest_list,
32+
const std::optional<int64_t>& base_manifest_list_size,
33+
const std::string& delta_manifest_list,
34+
const std::optional<int64_t>& delta_manifest_list_size,
35+
const std::optional<std::string>& changelog_manifest_list,
36+
const std::optional<int64_t>& changelog_manifest_list_size,
37+
const std::optional<std::string>& index_manifest, const std::string& commit_user,
38+
const int64_t commit_identifier, const CommitKind commit_kind, const int64_t time_millis,
39+
const std::optional<std::map<int32_t, int64_t>>& log_offsets,
40+
const std::optional<int64_t>& total_record_count,
41+
const std::optional<int64_t>& delta_record_count,
42+
const std::optional<int64_t>& changelog_record_count,
43+
const std::optional<int64_t>& watermark, const std::optional<std::string>& statistics,
44+
const std::optional<std::map<std::string, std::string>>& properties,
45+
const std::optional<int64_t>& next_row_id,
46+
const std::optional<std::vector<int64_t>>& tag_create_time,
47+
const std::optional<double_t>& tag_time_retained)
48+
: Snapshot(version, id, schema_id, base_manifest_list, base_manifest_list_size,
49+
delta_manifest_list, delta_manifest_list_size, changelog_manifest_list,
50+
changelog_manifest_list_size, index_manifest, commit_user, commit_identifier,
51+
commit_kind, time_millis, log_offsets, total_record_count, delta_record_count,
52+
changelog_record_count, watermark, statistics, properties, next_row_id),
53+
tag_create_time_(tag_create_time),
54+
tag_time_retained_(tag_time_retained) {}
55+
56+
bool Tag::operator==(const Tag& other) const {
57+
if (this == &other) {
58+
return true;
59+
}
60+
return Snapshot::operator==(other) && tag_create_time_ == other.tag_create_time_ &&
61+
tag_time_retained_ == other.tag_time_retained_;
62+
}
63+
64+
bool Tag::TEST_Equal(const Tag& other) const {
65+
if (this == &other) {
66+
return true;
67+
}
68+
69+
return Snapshot::TEST_Equal(other) && tag_create_time_ == other.tag_create_time_ &&
70+
tag_time_retained_ == other.tag_time_retained_;
71+
}
72+
73+
Result<Snapshot> Tag::TrimToSnapshot() const {
74+
return Snapshot(Version(), Id(), SchemaId(), BaseManifestList(), BaseManifestListSize(),
75+
DeltaManifestList(), DeltaManifestListSize(), ChangelogManifestList(),
76+
ChangelogManifestListSize(), IndexManifest(), CommitUser(), CommitIdentifier(),
77+
GetCommitKind(), TimeMillis(), LogOffsets(), TotalRecordCount(),
78+
DeltaRecordCount(), ChangelogRecordCount(), Watermark(), Statistics(),
79+
Properties(), NextRowId());
80+
}
81+
82+
rapidjson::Value Tag::ToJson(rapidjson::Document::AllocatorType* allocator) const noexcept(false) {
83+
rapidjson::Value obj(rapidjson::kObjectType);
84+
obj = Snapshot::ToJson(allocator);
85+
if (tag_create_time_ != std::nullopt) {
86+
obj.AddMember(rapidjson::StringRef(FIELD_TAG_CREATE_TIME),
87+
RapidJsonUtil::SerializeValue(tag_create_time_.value(), allocator).Move(),
88+
*allocator);
89+
}
90+
if (tag_time_retained_ != std::nullopt) {
91+
obj.AddMember(rapidjson::StringRef(FIELD_TAG_TIME_RETAINED),
92+
RapidJsonUtil::SerializeValue(tag_time_retained_.value(), allocator).Move(),
93+
*allocator);
94+
}
95+
return obj;
96+
}
97+
98+
void Tag::FromJson(const rapidjson::Value& obj) noexcept(false) {
99+
Snapshot::FromJson(obj);
100+
tag_create_time_ = RapidJsonUtil::DeserializeKeyValue<std::optional<std::vector<int64_t>>>(
101+
obj, FIELD_TAG_CREATE_TIME);
102+
tag_time_retained_ =
103+
RapidJsonUtil::DeserializeKeyValue<std::optional<double_t>>(obj, FIELD_TAG_TIME_RETAINED);
104+
}
105+
106+
Result<Tag> Tag::FromPath(const std::shared_ptr<FileSystem>& fs, const std::string& path) {
107+
std::string json_str;
108+
PAIMON_RETURN_NOT_OK(fs->ReadFile(path, &json_str));
109+
Tag tag;
110+
PAIMON_RETURN_NOT_OK(RapidJsonUtil::FromJsonString(json_str, &tag));
111+
return tag;
112+
}
113+
} // namespace paimon

0 commit comments

Comments
 (0)