diff --git a/sdk-core/src/main/java/io/milvus/v2/client/globalcluster/TopologyRefresher.java b/sdk-core/src/main/java/io/milvus/v2/client/globalcluster/TopologyRefresher.java index 04f1b1bed..a392bddd7 100644 --- a/sdk-core/src/main/java/io/milvus/v2/client/globalcluster/TopologyRefresher.java +++ b/sdk-core/src/main/java/io/milvus/v2/client/globalcluster/TopologyRefresher.java @@ -25,6 +25,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -37,6 +38,7 @@ public class TopologyRefresher { private final Consumer onTopologyChange; private final ScheduledExecutorService scheduler; private final AtomicLong currentVersion; + private final AtomicBoolean refreshing = new AtomicBoolean(false); public TopologyRefresher(String globalEndpoint, String token, long initialVersion, Consumer onTopologyChange) { @@ -59,7 +61,16 @@ public void start() { } public void triggerRefresh() { - scheduler.submit(this::refresh); + if (refreshing.getAndSet(true)) { + logger.debug("Topology refresh already in progress, skipping"); + return; + } + try { + scheduler.submit(this::refreshWithCleanup); + } catch (Exception e) { + refreshing.set(false); + logger.warn("Failed to submit topology refresh task: {}", e.getMessage()); + } } public void stop() { @@ -67,6 +78,14 @@ public void stop() { logger.info("Global topology refresher stopped for endpoint: {}", globalEndpoint); } + private void refreshWithCleanup() { + try { + refresh(); + } finally { + refreshing.set(false); + } + } + private void refresh() { try { GlobalTopology topology = GlobalClusterUtils.fetchTopology(globalEndpoint, token);