Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

Expand All @@ -37,6 +38,7 @@ public class TopologyRefresher {
private final Consumer<GlobalTopology> onTopologyChange;
private final ScheduledExecutorService scheduler;
private final AtomicLong currentVersion;
private final AtomicBoolean refreshing = new AtomicBoolean(false);

public TopologyRefresher(String globalEndpoint, String token, long initialVersion,
Consumer<GlobalTopology> onTopologyChange) {
Expand All @@ -59,14 +61,31 @@ public void start() {
}

public void triggerRefresh() {
scheduler.submit(this::refresh);
if (refreshing.getAndSet(true)) {
logger.debug("Topology refresh already in progress, skipping");
return;
}
try {
scheduler.submit(this::refreshWithCleanup);
} catch (Exception e) {
refreshing.set(false);
logger.warn("Failed to submit topology refresh task: {}", e.getMessage());
}
}
Comment thread
yhmo marked this conversation as resolved.

public void stop() {
scheduler.shutdownNow();
logger.info("Global topology refresher stopped for endpoint: {}", globalEndpoint);
}

private void refreshWithCleanup() {
try {
refresh();
} finally {
refreshing.set(false);
}
}

private void refresh() {
try {
GlobalTopology topology = GlobalClusterUtils.fetchTopology(globalEndpoint, token);
Expand Down
Loading