@@ -84,8 +84,6 @@ class TaskRunner {
8484 private static final double LEASE_EXTEND_DURATION_FACTOR = 0.8 ;
8585 private final ScheduledExecutorService leaseExtendExecutorService ;
8686 private Map <String , ScheduledFuture <?>> leaseExtendMap = new ConcurrentHashMap <>();
87- private final boolean trackActiveWorkers ;
88- private final boolean trackDiagnosticEvents ;
8987 private final AtomicInteger activeWorkerCount = new AtomicInteger (0 );
9088
9189 TaskRunner (Worker worker ,
@@ -97,8 +95,6 @@ class TaskRunner {
9795 int taskPollTimeout ,
9896 List <PollFilter > pollFilters ,
9997 EventDispatcher <TaskRunnerEvent > eventDispatcher ,
100- boolean trackActiveWorkers ,
101- boolean trackDiagnosticEvents ,
10298 boolean useVirtualThreads ) {
10399 this .worker = worker ;
104100 this .taskClient = taskClient ;
@@ -109,8 +105,6 @@ class TaskRunner {
109105 this .permits = new Semaphore (threadCount );
110106 this .pollFilters = pollFilters ;
111107 this .eventDispatcher = eventDispatcher ;
112- this .trackActiveWorkers = trackActiveWorkers ;
113- this .trackDiagnosticEvents = trackDiagnosticEvents ;
114108 this .tasksTobeExecuted = new LinkedBlockingQueue <>();
115109 this .enableUpdateV2 = Boolean .parseBoolean (System .getProperty ("taskUpdateV2" , "false" )) || Boolean .parseBoolean (System .getenv ("taskUpdateV2" ));
116110
@@ -251,9 +245,7 @@ private List<Task> pollTasksForWorker() {
251245
252246 if (worker .paused ()) {
253247 LOGGER .trace ("Worker {} has been paused. Not polling anymore!" , worker .getClass ());
254- if (trackDiagnosticEvents ) {
255- eventDispatcher .publish (new TaskPaused (taskType ));
256- }
248+ eventDispatcher .publish (new TaskPaused (taskType ));
257249 return List .of ();
258250 }
259251
@@ -270,9 +262,7 @@ private List<Task> pollTasksForWorker() {
270262 }
271263
272264 if (pollCount == 0 ) {
273- if (trackDiagnosticEvents ) {
274- eventDispatcher .publish (new TaskExecutionQueueFull (taskType ));
275- }
265+ eventDispatcher .publish (new TaskExecutionQueueFull (taskType ));
276266 return List .of ();
277267 }
278268
@@ -352,9 +342,7 @@ private void onUncaughtException(Thread thread, Throwable error) {
352342
353343 private Task processTask (Task task ) {
354344 eventDispatcher .publish (new TaskExecutionStarted (taskType , task .getTaskId (), worker .getIdentity ()));
355- if (trackActiveWorkers ) {
356- eventDispatcher .publish (new ActiveWorkersChanged (taskType , activeWorkerCount .incrementAndGet ()));
357- }
345+ eventDispatcher .publish (new ActiveWorkersChanged (taskType , activeWorkerCount .incrementAndGet ()));
358346
359347 // record execution start time for a task
360348 task .getExecutionMetadata ().setExecutionStartTime (System .currentTimeMillis ());
@@ -377,9 +365,7 @@ private Task processTask(Task task) {
377365 } finally {
378366 cancelLeaseExtension (task .getTaskId ());
379367 permits .release ();
380- if (trackActiveWorkers ) {
381- eventDispatcher .publish (new ActiveWorkersChanged (taskType , activeWorkerCount .decrementAndGet ()));
382- }
368+ eventDispatcher .publish (new ActiveWorkersChanged (taskType , activeWorkerCount .decrementAndGet ()));
383369 }
384370 return task ;
385371 }
0 commit comments