Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
.tag("request_type", "SET_JOB")
.tag("job_id", request.job_id);
if (request.__isset.event) {
st = manager.set_event(request.job_id, request.event);
const std::vector<int64_t>* table_ids_ptr = nullptr;
if (request.__isset.table_ids) {
table_ids_ptr = &request.table_ids;
}
st = manager.set_event(request.job_id, request.event, false, table_ids_ptr);
if (st.ok()) {
break;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delete_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine, const TPushReq& requ
return st;
}

st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), "");
st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), "", nullptr, tablet->table_id());
if (!st.ok()) {
LOG(WARNING) << "failed to commit rowset, status=" << st.to_string();
return st;
Expand Down
6 changes: 4 additions & 2 deletions be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ Status CloudDeltaWriter::commit_rowset() {
}

// Handle normal rowset with data
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "", nullptr,
rowset_builder()->tablet()->table_id());
}

Status CloudDeltaWriter::_commit_empty_rowset() {
Expand All @@ -138,7 +139,8 @@ Status CloudDeltaWriter::_commit_empty_rowset() {
return Status::OK();
}
// write a empty rowset kv to keep version continuous
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "", nullptr,
rowset_builder()->tablet()->table_id());
}

Status CloudDeltaWriter::set_txn_related_info() {
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const std::string
}

Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
RowsetMetaSharedPtr* existed_rs_meta) {
RowsetMetaSharedPtr* existed_rs_meta, int64_t table_id) {
VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
{
Expand Down Expand Up @@ -1379,7 +1379,7 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_i
<< ", with timeout: " << timeout_ms << " ms";
}
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.warm_up_rowset(rs_meta, timeout_ms);
manager.warm_up_rowset(rs_meta, table_id, timeout_ms);
return st;
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class CloudMetaMgr {
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);

Status commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr,
int64_t table_id = 0);
void cache_committed_rowset(RowsetMetaSharedPtr rs_meta, int64_t expiration_time);

Status update_tmp_rowset(const RowsetMeta& rs_meta);
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
st.to_string());
}

st = _cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(), _job_id,
&existed_rs_meta);
st = _cloud_storage_engine.meta_mgr().commit_rowset(
*rowset_writer->rowset_meta(), _job_id, &existed_rs_meta, _new_tablet->table_id());
if (!st.ok()) {
if (st.is<ALREADY_EXIST>()) {
LOG(INFO) << "Rowset " << rs_reader->version() << " has already existed in tablet "
Expand Down
55 changes: 45 additions & 10 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,8 @@ Status CloudWarmUpManager::clear_job(int64_t job_id) {
return st;
}

Status CloudWarmUpManager::set_event(int64_t job_id, TWarmUpEventType::type event, bool clear) {
Status CloudWarmUpManager::set_event(int64_t job_id, TWarmUpEventType::type event, bool clear,
const std::vector<int64_t>* table_ids) {
DBUG_EXECUTE_IF("CloudWarmUpManager.set_event.ignore_all", {
LOG(INFO) << "Ignore set_event request, job_id=" << job_id << ", event=" << event
<< ", clear=" << clear;
Expand All @@ -470,24 +471,56 @@ Status CloudWarmUpManager::set_event(int64_t job_id, TWarmUpEventType::type even
if (event == TWarmUpEventType::type::LOAD) {
if (clear) {
_tablet_replica_cache.erase(job_id);
_event_driven_filters.erase(job_id);
LOG(INFO) << "Clear event driven sync, job_id=" << job_id << ", event=" << event;
} else if (!_tablet_replica_cache.contains(job_id)) {
static_cast<void>(_tablet_replica_cache[job_id]);
LOG(INFO) << "Set event driven sync, job_id=" << job_id << ", event=" << event;
if (table_ids != nullptr) {
// table-level filter: set to the given table_id set (may be empty,
// meaning all matched tables were deleted — warm up nothing)
_event_driven_filters[job_id] =
std::unordered_set<int64_t>(table_ids->begin(), table_ids->end());
LOG(INFO) << "Set event driven sync with table filter, job_id=" << job_id
<< ", event=" << event << ", table_ids_size=" << table_ids->size();
} else {
// cluster-level: no filter, warm up all tables
_event_driven_filters[job_id] = std::nullopt;
LOG(INFO) << "Set event driven sync, job_id=" << job_id << ", event=" << event;
}
} else if (table_ids != nullptr) {
// Update table_ids for an existing job (may be empty)
_event_driven_filters[job_id] =
std::unordered_set<int64_t>(table_ids->begin(), table_ids->end());
LOG(INFO) << "Updated table filter for event driven sync, job_id=" << job_id
<< ", table_ids_size=" << table_ids->size();
}
} else {
st = Status::InternalError("The event {} is not supported yet", event);
}
return st;
}

std::vector<TReplicaInfo> CloudWarmUpManager::get_replica_info(int64_t tablet_id, bool bypass_cache,
bool& cache_hit) {
std::vector<TReplicaInfo> CloudWarmUpManager::get_replica_info(int64_t tablet_id, int64_t table_id,
bool bypass_cache, bool& cache_hit) {
std::vector<TReplicaInfo> replicas;
std::vector<int64_t> cancelled_jobs;
std::lock_guard<std::mutex> lock(_mtx);
cache_hit = false;
for (auto& [job_id, cache] : _tablet_replica_cache) {
// Check table-level filter: skip this job if table_id doesn't match
// table_id == 0 means the caller doesn't have table context (e.g., recycle_cache),
// so skip filtering
if (table_id != 0) {
auto filter_it = _event_driven_filters.find(job_id);
if (filter_it != _event_driven_filters.end() && filter_it->second.has_value()) {
if (filter_it->second->find(table_id) == filter_it->second->end()) {
VLOG_DEBUG << "get_replica_info: table_id=" << table_id
<< " not in filter for job_id=" << job_id << ", skipping";
continue;
}
}
}

if (!bypass_cache) {
auto it = cache.find(tablet_id);
if (it != cache.end()) {
Expand Down Expand Up @@ -580,13 +613,14 @@ std::vector<TReplicaInfo> CloudWarmUpManager::get_replica_info(int64_t tablet_id
return replicas;
}

void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms) {
void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id,
int64_t sync_wait_timeout_ms) {
bthread::Mutex mu;
bthread::ConditionVariable cv;
std::unique_lock<bthread::Mutex> lock(mu);
auto st = _thread_pool_token->submit_func([&, this]() {
std::unique_lock<bthread::Mutex> l(mu);
_warm_up_rowset(rs_meta, sync_wait_timeout_ms);
_warm_up_rowset(rs_meta, table_id, sync_wait_timeout_ms);
cv.notify_one();
});
if (!st.ok()) {
Expand All @@ -597,9 +631,10 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_t
}
}

void CloudWarmUpManager::_warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms) {
void CloudWarmUpManager::_warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id,
int64_t sync_wait_timeout_ms) {
bool cache_hit = false;
auto replicas = get_replica_info(rs_meta.tablet_id(), false, cache_hit);
auto replicas = get_replica_info(rs_meta.tablet_id(), table_id, false, cache_hit);
if (replicas.empty()) {
VLOG_DEBUG << "There is no need to warmup tablet=" << rs_meta.tablet_id()
<< ", skipping rowset=" << rs_meta.rowset_id().to_string();
Expand All @@ -608,7 +643,7 @@ void CloudWarmUpManager::_warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_
}
Status st = _do_warm_up_rowset(rs_meta, replicas, sync_wait_timeout_ms, !cache_hit);
if (cache_hit && !st.ok() && st.is<ErrorCode::TABLE_NOT_FOUND>()) {
replicas = get_replica_info(rs_meta.tablet_id(), true, cache_hit);
replicas = get_replica_info(rs_meta.tablet_id(), table_id, true, cache_hit);
st = _do_warm_up_rowset(rs_meta, replicas, sync_wait_timeout_ms, true);
}
if (!st.ok()) {
Expand Down Expand Up @@ -754,7 +789,7 @@ void CloudWarmUpManager::_recycle_cache(int64_t tablet_id,
const std::vector<RecycledRowsets>& rowsets) {
LOG(INFO) << "recycle_cache: tablet_id=" << tablet_id << ", num_rowsets=" << rowsets.size();
bool cache_hit = false;
auto replicas = get_replica_info(tablet_id, false, cache_hit);
auto replicas = get_replica_info(tablet_id, /*table_id=*/0, false, cache_hit);
if (replicas.empty()) {
return;
}
Expand Down
20 changes: 15 additions & 5 deletions be/src/cloud/cloud_warm_up_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
#include <condition_variable>
#include <deque>
#include <mutex>
#include <optional>
#include <string>
#include <thread>
#include <unordered_set>
#include <vector>

#include "cloud/cloud_storage_engine.h"
Expand All @@ -39,6 +41,11 @@ enum class DownloadType {
S3,
};

// Filter for event-driven warmup jobs.
// nullopt = cluster-level (no table filter, warm up all tables)
// has_value = table-level filter (only warm up tables in the set)
using EventDrivenJobFilter = std::optional<std::unordered_set<int64_t>>;

struct JobMeta {
JobMeta() = default;
JobMeta(const TJobMeta& meta);
Expand Down Expand Up @@ -75,7 +82,8 @@ class CloudWarmUpManager {
// Cancel the job
Status clear_job(int64_t job_id);

Status set_event(int64_t job_id, TWarmUpEventType::type event, bool clear = false);
Status set_event(int64_t job_id, TWarmUpEventType::type event, bool clear = false,
const std::vector<int64_t>* table_ids = nullptr);

// If `sync_wait_timeout_ms` <= 0, the function will send the warm-up RPC
// and return immediately without waiting for the warm-up to complete.
Expand All @@ -85,7 +93,7 @@ class CloudWarmUpManager {
// @param rs_meta Metadata of the rowset to be warmed up.
// @param sync_wait_timeout_ms Timeout in milliseconds to wait for the warm-up
// to complete. Non-positive value means no waiting.
void warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms = -1);
void warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id, int64_t sync_wait_timeout_ms = -1);

void recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);

Expand All @@ -105,10 +113,10 @@ class CloudWarmUpManager {
Status _do_warm_up_rowset(RowsetMeta& rs_meta, std::vector<TReplicaInfo>& replicas,
int64_t sync_wait_timeout_ms, bool skip_existence_check);

std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id, bool bypass_cache,
bool& cache_hit);
std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id, int64_t table_id,
bool bypass_cache, bool& cache_hit);

void _warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms);
void _warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id, int64_t sync_wait_timeout_ms);
void _recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);

void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system,
Expand All @@ -133,6 +141,8 @@ class CloudWarmUpManager {
using Cache = std::unordered_map<int64_t, CacheEntry>;
// job_id -> cache
std::unordered_map<int64_t, Cache> _tablet_replica_cache;
// job_id -> table filter (nullopt = cluster-level, no filter)
std::unordered_map<int64_t, EventDrivenJobFilter> _event_driven_filters;
std::unique_ptr<ThreadPool> _thread_pool;
std::unique_ptr<ThreadPoolToken> _thread_pool_token;

Expand Down
3 changes: 2 additions & 1 deletion be/src/storage/compaction/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1635,7 +1635,8 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {
// Currently, updates are only made in the time_series.
update_compaction_level();

RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get(), _uuid));
RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get(), _uuid,
nullptr, _tablet->table_id()));

// 4. modify rowsets in memory
RETURN_IF_ERROR(modify_rowsets());
Expand Down
Loading
Loading