Skip to content

Commit bcce199

Browse files
fix up
1 parent a505c15 commit bcce199

4 files changed

Lines changed: 18 additions & 9 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ public CompletableFuture<ActivityTask> poll(SlotPermit permit) {
9898
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
9999
.pollActivityTaskQueue(pollRequest)));
100100
} catch (Exception e) {
101+
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK)
102+
.gauge(MetricsType.NUM_POLLERS)
103+
.update(pollGauge.decrementAndGet());
101104
throw new RuntimeException(e);
102105
}
103106

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ public CompletableFuture<NexusTask> poll(SlotPermit permit) {
9090
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
9191
.pollNexusTaskQueue(pollRequest)));
9292
} catch (Exception e) {
93+
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK)
94+
.gauge(MetricsType.NUM_POLLERS)
95+
.update(pollGauge.decrementAndGet());
9396
throw new RuntimeException(e);
9497
}
9598

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -213,30 +213,30 @@ public void run() {
213213
}
214214

215215
if (shouldTerminate()) {
216-
return;
216+
continue;
217217
}
218218

219219
pollerBalancer.balance(asyncTaskPoller.getLabel());
220220
if (shouldTerminate()) {
221-
return;
221+
continue;
222222
}
223223
// Reserve a slot for the poll request
224224
SlotSupplierFuture future;
225225
try {
226226
future = slotSupplier.reserveSlot(slotReservationData);
227227
} catch (Exception e) {
228228
log.warn("Error while trying to reserve a slot", e.getCause());
229-
return;
229+
continue;
230230
}
231231
permit = BasePoller.getSlotPermitAndHandleInterrupts(future, slotSupplier);
232232
if (permit == null || shouldTerminate()) {
233-
return;
233+
continue;
234234
}
235235

236236
pollerSemaphore.acquire();
237237
pollerSemaphoreAcquired = true;
238238
if (shouldTerminate()) {
239-
return;
239+
continue;
240240
}
241241
workerMetricsScope.counter(MetricsType.POLLER_START_COUNTER).inc(1);
242242

@@ -299,13 +299,13 @@ public void run() {
299299
if (shouldTerminate()) {
300300
pollerBalancer.removePoller(asyncTaskPoller.getLabel());
301301
abort = true;
302+
log.info(
303+
"Poll loop is terminated: {} - {}",
304+
AsyncPoller.this.getClass().getSimpleName(),
305+
asyncTaskPoller.getLabel());
302306
}
303307
}
304308
}
305-
log.info(
306-
"Poll loop is terminated: {} - {}",
307-
AsyncPoller.this.getClass().getSimpleName(),
308-
asyncTaskPoller.getLabel());
309309
}
310310
}
311311

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ public CompletableFuture<WorkflowTask> poll(SlotPermit permit)
136136
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
137137
.pollWorkflowTaskQueue(pollRequest)));
138138
} catch (Exception e) {
139+
MetricsTag.tagged(metricsScope, taskQueueTagValue)
140+
.gauge(MetricsType.NUM_POLLERS)
141+
.update(pollGauge.decrementAndGet());
139142
throw new RuntimeException(e);
140143
}
141144

0 commit comments

Comments
 (0)