Skip to content

Commit dc6a3a6

Browse files
committed
[feature](tso) Add global monotonically increasing Timestamp Oracle (TSO)
Signed-off-by: Jingzhe Jia <AntiTopQuark1350@outlook.com>
1 parent 9383ec6 commit dc6a3a6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2050
-86
lines changed

be/src/agent/task_worker_pool.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2029,8 +2029,8 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
20292029

20302030
std::set<TTabletId> error_tablet_ids;
20312031
std::map<TTabletId, TVersion> succ_tablets;
2032-
// partition_id, tablet_id, publish_version
2033-
std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets;
2032+
// partition_id, tablet_id, publish_version, commit_tso
2033+
std::vector<DiscontinuousVersionTablet> discontinuous_version_tablets;
20342034
std::map<TTableId, std::map<TTabletId, int64_t>> table_id_to_tablet_id_to_num_delta_rows;
20352035
uint32_t retry_time = 0;
20362036
Status status;
@@ -2087,8 +2087,8 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
20872087
}
20882088

20892089
for (auto& item : discontinuous_version_tablets) {
2090-
_engine.add_async_publish_task(std::get<0>(item), std::get<1>(item), std::get<2>(item),
2091-
publish_version_req.transaction_id, false);
2090+
_engine.add_async_publish_task(item.partition_id, item.tablet_id, item.publish_version,
2091+
publish_version_req.transaction_id, false, item.commit_tso);
20922092
}
20932093
TFinishTaskRequest finish_task_request;
20942094
if (!status.ok()) [[unlikely]] {

be/src/information_schema/schema_rowsets_scanner.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaRowsetsScanner::_s_tbls_columns = {
6363
{"CREATION_TIME", TYPE_DATETIME, sizeof(int64_t), true},
6464
{"NEWEST_WRITE_TIMESTAMP", TYPE_DATETIME, sizeof(int64_t), true},
6565
{"SCHEMA_VERSION", TYPE_INT, sizeof(int32_t), true},
66+
{"COMMIT_TSO", TYPE_BIGINT, sizeof(int64_t), true},
6667

6768
};
6869

@@ -268,6 +269,16 @@ Status SchemaRowsetsScanner::_fill_block_impl(Block* block) {
268269
}
269270
RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas));
270271
}
272+
// COMMIT_TSO
273+
{
274+
std::vector<int64_t> srcs(fill_rowsets_num);
275+
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
276+
RowsetSharedPtr rowset = rowsets_[i];
277+
srcs[i - fill_idx_begin] = rowset->commit_tso();
278+
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
279+
}
280+
RETURN_IF_ERROR(fill_dest_column_for_range(block, 13, datas));
281+
}
271282

272283
_rowsets_idx += fill_rowsets_num;
273284
return Status::OK();

be/src/service/http/action/pad_rowset_action.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ Status PadRowsetAction::_pad_rowset(Tablet* tablet, const Version& version) {
110110
auto writer = DORIS_TRY(tablet->create_rowset_writer(ctx, false));
111111
RowsetSharedPtr rowset;
112112
RETURN_IF_ERROR(writer->build(rowset));
113-
rowset->make_visible(version);
113+
rowset->make_visible(version, -1);
114114

115115
std::vector<RowsetSharedPtr> to_add {rowset};
116116
std::vector<RowsetSharedPtr> to_delete;

be/src/storage/data_dir.cpp

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -483,20 +483,21 @@ Status DataDir::load() {
483483
}
484484
}
485485

486-
auto load_pending_publish_info_func =
487-
[&engine = _engine](int64_t tablet_id, int64_t publish_version, std::string_view info) {
488-
PendingPublishInfoPB pending_publish_info_pb;
489-
bool parsed = pending_publish_info_pb.ParseFromArray(info.data(),
490-
cast_set<int>(info.size()));
491-
if (!parsed) {
492-
LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id
493-
<< " publish_version: " << publish_version;
494-
}
495-
engine.add_async_publish_task(pending_publish_info_pb.partition_id(), tablet_id,
496-
publish_version,
497-
pending_publish_info_pb.transaction_id(), true);
498-
return true;
499-
};
486+
auto load_pending_publish_info_func = [&engine = _engine](int64_t tablet_id,
487+
int64_t publish_version,
488+
std::string_view info) {
489+
PendingPublishInfoPB pending_publish_info_pb;
490+
bool parsed =
491+
pending_publish_info_pb.ParseFromArray(info.data(), cast_set<int>(info.size()));
492+
if (!parsed) {
493+
LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id
494+
<< " publish_version: " << publish_version;
495+
}
496+
engine.add_async_publish_task(pending_publish_info_pb.partition_id(), tablet_id,
497+
publish_version, pending_publish_info_pb.transaction_id(),
498+
true, pending_publish_info_pb.commit_tso());
499+
return true;
500+
};
500501
MonotonicStopWatch pending_publish_timer;
501502
pending_publish_timer.start();
502503
RETURN_IF_ERROR(

be/src/storage/olap_server.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1660,7 +1660,7 @@ void StorageEngine::_follow_cooldown_meta(TabletSharedPtr t) {
16601660

16611661
void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_id,
16621662
int64_t publish_version, int64_t transaction_id,
1663-
bool is_recovery) {
1663+
bool is_recovery, int64_t commit_tso) {
16641664
if (!is_recovery) {
16651665
bool exists = false;
16661666
{
@@ -1685,6 +1685,7 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_
16851685
PendingPublishInfoPB pending_publish_info_pb;
16861686
pending_publish_info_pb.set_partition_id(partition_id);
16871687
pending_publish_info_pb.set_transaction_id(transaction_id);
1688+
pending_publish_info_pb.set_commit_tso(commit_tso);
16881689
static_cast<void>(TabletMetaManager::save_pending_publish_info(
16891690
tablet->data_dir(), tablet->tablet_id(), publish_version,
16901691
pending_publish_info_pb.SerializeAsString()));
@@ -1693,7 +1694,7 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_
16931694
<< " version: " << publish_version << " txn_id:" << transaction_id
16941695
<< " is_recovery: " << is_recovery;
16951696
std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
1696-
_async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id};
1697+
_async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id, commit_tso};
16971698
}
16981699

16991700
int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
@@ -1730,8 +1731,9 @@ void StorageEngine::_process_async_publish() {
17301731

17311732
auto task_iter = tablet_iter->second.begin();
17321733
int64_t version = task_iter->first;
1733-
int64_t transaction_id = task_iter->second.first;
1734-
int64_t partition_id = task_iter->second.second;
1734+
int64_t transaction_id = std::get<0>(task_iter->second);
1735+
int64_t partition_id = std::get<1>(task_iter->second);
1736+
int64_t commit_tso = std::get<2>(task_iter->second);
17351737
int64_t max_version = tablet->max_version().second;
17361738

17371739
if (version <= max_version) {
@@ -1753,7 +1755,7 @@ void StorageEngine::_process_async_publish() {
17531755
}
17541756

17551757
auto async_publish_task = std::make_shared<AsyncTabletPublishTask>(
1756-
*this, tablet, partition_id, transaction_id, version);
1758+
*this, tablet, partition_id, transaction_id, version, commit_tso);
17571759
static_cast<void>(_tablet_publish_txn_thread_pool->submit_func(
17581760
[=]() { async_publish_task->handle(); }));
17591761
tablet_iter->second.erase(task_iter);

be/src/storage/rowset/rowset.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ Status Rowset::load(bool use_cache) {
8484
return Status::OK();
8585
}
8686

87-
void Rowset::make_visible(Version version) {
87+
void Rowset::make_visible(Version version, int64_t commit_tso) {
8888
_is_pending = false;
8989
_rowset_meta->set_version(version);
9090
_rowset_meta->set_rowset_state(VISIBLE);
@@ -95,6 +95,7 @@ void Rowset::make_visible(Version version) {
9595
if (_rowset_meta->has_delete_predicate()) {
9696
_rowset_meta->mutable_delete_predicate()->set_version(cast_set<int32_t>(version.first));
9797
}
98+
_rowset_meta->set_commit_tso(commit_tso);
9899
}
99100

100101
void Rowset::set_version(Version version) {

be/src/storage/rowset/rowset.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
140140
const std::string& tablet_path() const { return _tablet_path; }
141141

142142
// publish rowset to make it visible to read
143-
void make_visible(Version version);
143+
void make_visible(Version version, int64_t commit_tso);
144144
void set_version(Version version);
145145
const TabletSchemaSPtr& tablet_schema() const;
146146

@@ -166,6 +166,9 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
166166
RowsetMetaPB get_rowset_pb() const { return rowset_meta()->get_rowset_pb(); }
167167
// The writing time of the newest data in rowset, to measure the freshness of a rowset.
168168
int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); }
169+
// The commit tso of the newest data in rowset.
170+
int64_t commit_tso() const { return rowset_meta()->commit_tso(); }
171+
169172
bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); }
170173
KeysType keys_type() { return _schema->keys_type(); }
171174
RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); }

be/src/storage/rowset/rowset_meta.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,10 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {
462462
[algorithm]() -> Result<EncryptionAlgorithmPB> { return algorithm; });
463463
}
464464

465+
int64_t commit_tso() const { return _rowset_meta_pb.commit_tso(); }
466+
467+
void set_commit_tso(int64_t commit_tso) { _rowset_meta_pb.set_commit_tso(commit_tso); }
468+
465469
private:
466470
bool _deserialize_from_pb(std::string_view value);
467471

be/src/storage/storage_engine.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ class StorageEngine final : public BaseStorageEngine {
361361
void gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos);
362362

363363
void add_async_publish_task(int64_t partition_id, int64_t tablet_id, int64_t publish_version,
364-
int64_t transaction_id, bool is_recover);
364+
int64_t transaction_id, bool is_recover, int64_t commit_tso);
365365
int64_t get_pending_publish_min_version(int64_t tablet_id);
366366

367367
bool add_broken_path(std::string path);
@@ -583,8 +583,9 @@ class StorageEngine final : public BaseStorageEngine {
583583

584584
std::mutex _cumu_compaction_delay_mtx;
585585

586-
// tablet_id, publish_version, transaction_id, partition_id
587-
std::map<int64_t, std::map<int64_t, std::pair<int64_t, int64_t>>> _async_publish_tasks;
586+
// tablet_id, publish_version, transaction_id, partition_id, commit_tso
587+
std::map<int64_t, std::map<int64_t, std::tuple<int64_t, int64_t, int64_t>>>
588+
_async_publish_tasks;
588589
// aync publish for discontinuous versions of merge_on_write table
589590
std::shared_ptr<Thread> _async_publish_thread;
590591
std::shared_mutex _async_publish_lock;

0 commit comments

Comments
 (0)