2525import java .util .concurrent .Executors ;
2626import java .util .concurrent .ScheduledExecutorService ;
2727import java .util .concurrent .TimeUnit ;
28+ import java .util .concurrent .atomic .AtomicBoolean ;
2829import java .util .concurrent .atomic .AtomicLong ;
2930import java .util .function .Consumer ;
3031
@@ -37,6 +38,7 @@ public class TopologyRefresher {
3738 private final Consumer <GlobalTopology > onTopologyChange ;
3839 private final ScheduledExecutorService scheduler ;
3940 private final AtomicLong currentVersion ;
41+ private final AtomicBoolean refreshing = new AtomicBoolean (false );
4042
4143 public TopologyRefresher (String globalEndpoint , String token , long initialVersion ,
4244 Consumer <GlobalTopology > onTopologyChange ) {
@@ -59,14 +61,26 @@ public void start() {
5961 }
6062
6163 public void triggerRefresh () {
62- scheduler .submit (this ::refresh );
64+ if (refreshing .getAndSet (true )) {
65+ logger .debug ("Topology refresh already in progress, skipping" );
66+ return ;
67+ }
68+ scheduler .submit (this ::refreshWithCleanup );
6369 }
6470
6571 public void stop () {
6672 scheduler .shutdownNow ();
6773 logger .info ("Global topology refresher stopped for endpoint: {}" , globalEndpoint );
6874 }
6975
76+ private void refreshWithCleanup () {
77+ try {
78+ refresh ();
79+ } finally {
80+ refreshing .set (false );
81+ }
82+ }
83+
7084 private void refresh () {
7185 try {
7286 GlobalTopology topology = GlobalClusterUtils .fetchTopology (globalEndpoint , token );
0 commit comments