2525import java .util .concurrent .Executors ;
2626import java .util .concurrent .ScheduledExecutorService ;
2727import java .util .concurrent .TimeUnit ;
28+ import java .util .concurrent .atomic .AtomicBoolean ;
2829import java .util .concurrent .atomic .AtomicLong ;
2930import java .util .function .Consumer ;
3031
@@ -37,6 +38,7 @@ public class TopologyRefresher {
3738 private final Consumer <GlobalTopology > onTopologyChange ;
3839 private final ScheduledExecutorService scheduler ;
3940 private final AtomicLong currentVersion ;
41+ private final AtomicBoolean refreshing = new AtomicBoolean (false );
4042
4143 public TopologyRefresher (String globalEndpoint , String token , long initialVersion ,
4244 Consumer <GlobalTopology > onTopologyChange ) {
@@ -59,14 +61,31 @@ public void start() {
5961 }
6062
6163 public void triggerRefresh () {
62- scheduler .submit (this ::refresh );
64+ if (refreshing .getAndSet (true )) {
65+ logger .debug ("Topology refresh already in progress, skipping" );
66+ return ;
67+ }
68+ try {
69+ scheduler .submit (this ::refreshWithCleanup );
70+ } catch (Exception e ) {
71+ refreshing .set (false );
72+ logger .warn ("Failed to submit topology refresh task: {}" , e .getMessage ());
73+ }
6374 }
6475
6576 public void stop () {
6677 scheduler .shutdownNow ();
6778 logger .info ("Global topology refresher stopped for endpoint: {}" , globalEndpoint );
6879 }
6980
81+ private void refreshWithCleanup () {
82+ try {
83+ refresh ();
84+ } finally {
85+ refreshing .set (false );
86+ }
87+ }
88+
7089 private void refresh () {
7190 try {
7291 GlobalTopology topology = GlobalClusterUtils .fetchTopology (globalEndpoint , token );
0 commit comments