Skip to content

Commit a28a924

Browse files
update doc and add missing method in configurer
1 parent 1023f9d commit a28a924

2 files changed

Lines changed: 34 additions & 18 deletions

File tree

INTERCEPTOR.md

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ The core event routing component that manages listener registration and event pu
133133

134134
**Key Features**:
135135
- Generic type parameter ensures type safety
136-
- Thread-safe using `ConcurrentHashMap` and `CopyOnWriteArrayList`
136+
- Thread-safe using nested `ConcurrentHashMap` structures
137137
- Asynchronous event publishing via `CompletableFuture.runAsync()`
138138
- Supports both specific event type listeners and "promiscuous" listeners (listening to all events)
139139

@@ -390,14 +390,11 @@ TaskRunnerConfigurer configurer = new TaskRunnerConfigurer.Builder(taskClient, w
390390
System.err.println("Task " + event.getTaskId() +
391391
" failed: " + event.getCause().getMessage());
392392
})
393-
// Listen to ALL task runner events
394-
.withListener(TaskRunnerEvent.class, event -> {
395-
System.out.println("Event: " + event.getClass().getSimpleName() +
396-
" for task type: " + event.getTaskType());
397-
})
398393
.build();
399394
```
400395

396+
> **Note:** Registering with a parent class like `TaskRunnerEvent.class` does **not** create a catch-all listener. The dispatcher routes by exact event class. The only "promiscuous" key is `ConductorClientEvent.class`, which receives all events regardless of type.
397+
401398
#### Approach 2: Implementing Custom MetricsCollector
402399

403400
```java
@@ -511,9 +508,13 @@ public class TaskMonitor implements TaskRunnerEventsListener {
511508
}
512509
}
513510

514-
// Register manually using EventDispatcher
515-
EventDispatcher<TaskRunnerEvent> dispatcher = new EventDispatcher<>();
516-
ListenerRegister.register(new TaskMonitor(), dispatcher);
511+
// Register a TaskRunnerEventsListener via the builder's dispatcher
512+
TaskRunnerConfigurer.Builder builder =
513+
new TaskRunnerConfigurer.Builder(taskClient, workers);
514+
ListenerRegister.register(new TaskMonitor(), builder.getEventDispatcher());
515+
516+
TaskRunnerConfigurer configurer = builder.build();
517+
configurer.init();
517518
```
518519

519520
### Workflow and Task Client Event Listeners
@@ -630,7 +631,7 @@ public interface TaskRunnerEventsListener {
630631
public static void register(TaskRunnerEventsListener listener,
631632
EventDispatcher<TaskRunnerEvent> dispatcher) {
632633
// ... existing registrations ...
633-
dispatcher.register(TaskRetried.class, listener::consume);
634+
dispatcher.register(TaskRetried.class, listener, listener::consume);
634635
}
635636
```
636637

@@ -860,14 +861,19 @@ CloudWatchMetricsCollector cloudwatch = new CloudWatchMetricsCollector(
860861
"ConductorMetrics"
861862
);
862863

863-
// Register all collectors
864+
// **Important:** `withMetricsCollector` can only be called once effectively -- each call
865+
// replaces the previous collector. Only the last one will be properly unregistered during
866+
// shutdown(). To use multiple collectors, register additional ones via `withListener` calls
867+
// or directly through `ListenerRegister`.
868+
869+
// Use withMetricsCollector for the primary collector
864870
TaskRunnerConfigurer configurer = new TaskRunnerConfigurer.Builder(taskClient, workers)
865871
.withMetricsCollector(prometheus)
866-
.withMetricsCollector(datadog)
867-
.withMetricsCollector(cloudwatch)
868872
.build();
869873

870-
configurer.init();
874+
// Register additional collectors directly on the TaskClient/WorkflowClient
875+
// event dispatchers, or use individual withListener calls:
876+
// builder.withListener(TaskExecutionCompleted.class, datadog::consume);
871877
```
872878

873879
### Building Custom Observability Solutions
@@ -1142,15 +1148,16 @@ public class WorkerScaler implements TaskRunnerEventsListener {
11421148

11431149
### Async Event Publishing
11441150

1145-
All events are published asynchronously using `CompletableFuture.runAsync()`, which ensures:
1151+
All events are published asynchronously using `CompletableFuture.runAsync()` with a dedicated static single-thread daemon executor (`conductor-event-dispatch`), which ensures:
11461152
- **Non-blocking**: Task execution is never blocked by event processing
11471153
- **Independent failure**: If a listener throws an exception, it doesn't affect task execution
1148-
- **Scalability**: Event processing can scale independently
1154+
- **Serialized dispatch**: All events are processed in order on a single thread; listeners should remain lightweight to avoid becoming a bottleneck
1155+
1156+
For cases where async dispatch is unsafe (e.g. inside an `UncaughtExceptionHandler`), the `publishSync()` method dispatches directly on the calling thread.
11491157

11501158
### Thread Safety
11511159

1152-
- **CopyOnWriteArrayList**: Used for listener storage, optimized for read-heavy workloads
1153-
- **ConcurrentHashMap**: Used for event type to listener mappings
1160+
- **ConcurrentHashMap**: Used for both event-type-to-listeners mapping and per-event-type listener storage (keyed by listener identity)
11541161
- **Immutable Events**: All event objects are immutable (final fields), preventing race conditions
11551162

11561163
### Memory Considerations

conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunnerConfigurer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,15 @@ public static class Builder {
213213
private final EventDispatcher<TaskRunnerEvent> eventDispatcher = new EventDispatcher<>();
214214
private boolean useVirtualThreads;
215215

216+
/**
217+
* Returns the event dispatcher used by this builder, allowing direct
218+
* listener registration via {@link ListenerRegister} for implementations
219+
* of {@link TaskRunnerEventsListener} or other custom listener interfaces.
220+
*/
221+
public EventDispatcher<TaskRunnerEvent> getEventDispatcher() {
222+
return eventDispatcher;
223+
}
224+
216225
private MetricsCollector metricsCollector;
217226
private boolean metricsCollectorExplicitlySet;
218227

0 commit comments

Comments
 (0)