Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 20 additions & 27 deletions fdbserver/datadistributor/DDShardTracker.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1139,17 +1139,14 @@ Future<Void> fetchTopKShardMetrics_impl(DataDistributionTracker* self, GetTopKMe
}
}

ACTOR Future<Void> fetchTopKShardMetrics(DataDistributionTracker* self, GetTopKMetricsRequest req) {
choose {
// simulate time_out
when(wait(g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01) ? Never()
: fetchTopKShardMetrics_impl(self, req))) {}
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) {
CODE_PROBE(true, "TopK DD_SHARD_METRICS_TIMEOUT");
req.reply.send(GetTopKMetricsReply());
}
Future<Void> fetchTopKShardMetrics(DataDistributionTracker* self, GetTopKMetricsRequest req) {
// simulate time_out
Future<Void> f =
g_network->isSimulated() && BUGGIFY_WITH_PROB(0.01) ? Never() : fetchTopKShardMetrics_impl(self, req);
if (auto const res = co_await timeout(f, SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT); !res.present()) {
CODE_PROBE(true, "TopK DD_SHARD_METRICS_TIMEOUT");
req.reply.send(GetTopKMetricsReply());
}
return Void();
}

Future<Void> fetchShardMetrics_impl(DataDistributionTracker* self, GetMetricsRequest req) {
Expand Down Expand Up @@ -1180,17 +1177,15 @@ Future<Void> fetchShardMetrics_impl(DataDistributionTracker* self, GetMetricsReq
}
}

ACTOR Future<Void> fetchShardMetrics(DataDistributionTracker* self, GetMetricsRequest req) {
choose {
when(wait(fetchShardMetrics_impl(self, req))) {}
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT, TaskPriority::DataDistribution))) {
CODE_PROBE(true, "DD_SHARD_METRICS_TIMEOUT");
StorageMetrics largeMetrics;
largeMetrics.bytes = getMaxShardSize(self->dbSizeEstimate->get());
req.reply.send(largeMetrics);
}
Future<Void> fetchShardMetrics(DataDistributionTracker* self, GetMetricsRequest req) {
if (auto const res = co_await timeout(
fetchShardMetrics_impl(self, req), SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT, TaskPriority::DataDistribution);
!res.present()) {
CODE_PROBE(true, "DD_SHARD_METRICS_TIMEOUT");
StorageMetrics largeMetrics;
largeMetrics.bytes = getMaxShardSize(self->dbSizeEstimate->get());
req.reply.send(largeMetrics);
}
return Void();
}

Future<Void> fetchShardMetricsList_impl(DataDistributionTracker* self, GetMetricsListRequest req) {
Expand Down Expand Up @@ -1233,14 +1228,12 @@ Future<Void> fetchShardMetricsList_impl(DataDistributionTracker* self, GetMetric
}
}

ACTOR Future<Void> fetchShardMetricsList(DataDistributionTracker* self, GetMetricsListRequest req) {
choose {
when(wait(fetchShardMetricsList_impl(self, req))) {}
when(wait(delay(SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT))) {
req.reply.sendError(timed_out());
}
Future<Void> fetchShardMetricsList(DataDistributionTracker* self, GetMetricsListRequest req) {
if (auto const res =
co_await timeout(fetchShardMetricsList_impl(self, req), SERVER_KNOBS->DD_SHARD_METRICS_TIMEOUT);
!res.present()) {
req.reply.sendError(timed_out());
}
return Void();
}

void triggerStorageQueueRebalance(DataDistributionTracker* self, RebalanceStorageQueueRequest req) {
Expand Down
4 changes: 2 additions & 2 deletions flow/include/flow/genericactors.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ Future<T> timeout(Future<T> what, double time, T timedoutValue, TaskPriority tas
}

ACTOR template <class T>
Future<Optional<T>> timeout(Future<T> what, double time) {
Future<Void> end = delay(time);
Future<Optional<T>> timeout(Future<T> what, double time, TaskPriority taskID = TaskPriority::DefaultDelay) {
Future<Void> end = delay(time, taskID);
choose {
when(T t = wait(what)) {
return t;
Expand Down