diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java index 3e57dce297..caafa1ec60 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java @@ -67,6 +67,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -169,6 +170,8 @@ public class FlinkAppHttpWatcher { private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0"); + private static final Set WATCHING_IN_PROGRESS = ConcurrentHashMap.newKeySet(); + @Qualifier("flinkRestAPIWatchingExecutor") @Autowired private Executor watchExecutor; @@ -210,7 +213,17 @@ public void start() { || timeMillis - lastOptionTime <= OPTION_INTERVAL.toMillis() || timeMillis - lastWatchTime >= WATCHING_INTERVAL.toMillis()) { lastWatchTime = timeMillis; - WATCHING_APPS.forEach(this::watch); + WATCHING_APPS.forEach((appId, application) -> { + if (WATCHING_IN_PROGRESS.add(appId)) { + try { + watch(appId, application); + } finally { + WATCHING_IN_PROGRESS.remove(appId); + } + } else { + log.info("App {} is already being watched, skipping", appId); + } + }); } }