Skip to content

Commit 04962b3

Browse files
made 8 methods default noops, update doc
1 parent a28a924 commit 04962b3

2 files changed

Lines changed: 83 additions & 23 deletions

File tree

INTERCEPTOR.md

Lines changed: 77 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,41 @@ ConductorClientEvent (abstract)
9191
│ │ • String workerId
9292
│ │ • Duration duration
9393
│ │
94-
│ └── TaskExecutionFailure
95-
│ • String taskId
96-
│ • String workerId
97-
│ • Throwable cause
98-
│ • Duration duration
94+
│ ├── TaskExecutionFailure
95+
│ │ • String taskId
96+
│ │ • String workerId
97+
│ │ • Throwable cause
98+
│ │ • Duration duration
99+
│ │
100+
│ ├── TaskUpdateCompleted
101+
│ │ • String taskId
102+
│ │ • String workerId
103+
│ │ • String workflowInstanceId
104+
│ │ • Duration duration
105+
│ │
106+
│ ├── TaskUpdateFailure
107+
│ │ • String taskId
108+
│ │ • String workerId
109+
│ │ • String workflowInstanceId
110+
│ │ • Duration duration
111+
│ │ • Throwable cause
112+
│ │
113+
│ ├── TaskAckFailure
114+
│ │ • String taskId
115+
│ │
116+
│ ├── TaskAckError
117+
│ │ • String taskId
118+
│ │ • Throwable cause
119+
│ │
120+
│ ├── TaskExecutionQueueFull
121+
│ │
122+
│ ├── TaskPaused
123+
│ │
124+
│ ├── ThreadUncaughtException
125+
│ │ • Throwable cause
126+
│ │
127+
│ └── ActiveWorkersChanged
128+
│ • int count
99129
100130
├── WorkflowClientEvent (abstract)
101131
│ │ • String name
@@ -123,6 +153,10 @@ ConductorClientEvent (abstract)
123153
• long size
124154
```
125155

156+
All `consume()` methods in `TaskRunnerEventsListener` are `default` no-ops, so implementations only need to override the events they care about.
157+
158+
For the full Prometheus metrics catalog (counters, timers, gauges, and size histograms), see [`conductor-client-metrics/README.md`](conductor-client-metrics/README.md).
159+
126160
## Core Components
127161

128162
### 1. EventDispatcher\<T\>
@@ -140,17 +174,28 @@ The core event routing component that manages listener registration and event pu
140174
**API**:
141175
```java
142176
public class EventDispatcher<T extends ConductorClientEvent> {
143-
// Register a listener for a specific event type
177+
// Register a listener for a specific event type (consumer itself is the key)
144178
public <U extends T> void register(Class<U> clazz, Consumer<U> listener);
145179

146-
// Unregister a listener
180+
// Register a listener under an explicit key (idempotent per clazz+key)
181+
public <U extends T> void register(Class<U> clazz, Object key, Consumer<U> listener);
182+
183+
// Unregister a listener by consumer reference
147184
public <U extends T> void unregister(Class<U> clazz, Consumer<U> listener);
148185

149-
// Publish an event (async)
186+
// Unregister a listener by key
187+
public <U extends T> void unregister(Class<U> clazz, Object key);
188+
189+
// Publish an event asynchronously
150190
public void publish(T event);
191+
192+
// Publish an event on the calling thread (for use in UncaughtExceptionHandler, etc.)
193+
public void publishSync(T event);
151194
}
152195
```
153196

197+
The 3-arg `register(Class, Object, Consumer)` form is used by `ListenerRegister` with the listener object as the key, making bulk registration idempotent — calling it twice with the same listener instance is a safe no-op.
198+
154199
### 2. Listener Interfaces
155200

156201
#### TaskRunnerEventsListener
@@ -161,12 +206,15 @@ Defines callbacks for task runner lifecycle events.
161206

162207
```java
163208
public interface TaskRunnerEventsListener {
164-
void consume(PollFailure e);
165-
void consume(PollCompleted e);
166-
void consume(PollStarted e);
167-
void consume(TaskExecutionStarted e);
168-
void consume(TaskExecutionCompleted e);
169-
void consume(TaskExecutionFailure e);
209+
default void consume(PollFailure e) {}
210+
default void consume(PollCompleted e) {}
211+
default void consume(PollStarted e) {}
212+
default void consume(TaskExecutionStarted e) {}
213+
default void consume(TaskExecutionCompleted e) {}
214+
default void consume(TaskExecutionFailure e) {}
215+
// ... plus default no-ops for TaskUpdateCompleted, TaskUpdateFailure,
216+
// TaskAckFailure, TaskAckError, TaskExecutionQueueFull,
217+
// TaskPaused, ThreadUncaughtException, ActiveWorkersChanged
170218
}
171219
```
172220

@@ -216,7 +264,7 @@ public interface MetricsCollector
216264

217265
**Location**: `conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/ListenerRegister.java`
218266

219-
Utility class for bulk registration of listeners with event dispatchers.
267+
Utility class for bulk registration of listeners with event dispatchers. Internally, each method calls `dispatcher.register(EventClass.class, listener, listener::consume)` — using the listener object as the key — so calling `register` multiple times with the same `(listener, dispatcher)` pair is a safe no-op.
220268

221269
```java
222270
public class ListenerRegister {
@@ -308,6 +356,11 @@ For the complete metric catalog and setup instructions, see [`conductor-client-m
308356
│ eventDispatcher.publish( │
309357
│ new WorkflowPayloadUsedEvent(name, version, │
310358
│ "WRITE", "WORKFLOW_INPUT")) │
359+
│ if IOException during serialization/upload: │
360+
│ eventDispatcher.publish( │
361+
│ new WorkflowStartedEvent(name, version, │
362+
│ false, error)) │
363+
│ throw ConductorClientException │
311364
└─────────────────────────────────────────────────────────────────┘
312365
313366
┌─────────────────────────────────────────────────────────────────┐
@@ -318,11 +371,16 @@ For the complete metric catalog and setup instructions, see [`conductor-client-m
318371
│ WorkflowClient.startWorkflow() │
319372
│ • Success: eventDispatcher.publish( │
320373
│ new WorkflowStartedEvent(name, version)) │
321-
│ • Failure: eventDispatcher.publish( │
322-
│ new WorkflowStartedEvent(name, version, false, error)) │
323374
└─────────────────────────────────────────────────────────────────┘
324375
```
325376

377+
> Note: The failure `WorkflowStartedEvent` is published from
378+
> `checkAndUploadToExternalStorage()`, not from `startWorkflow()`. It fires
379+
> only when an `IOException` occurs during payload serialization or external
380+
> storage upload. If `client.execute()` itself fails (HTTP error, network
381+
> timeout), no `WorkflowStartedEvent` is published — the exception propagates
382+
> directly to the caller.
383+
326384
> Note: `WorkflowInputPayloadSizeEvent` is no longer published from
327385
> `WorkflowClient` — the canonical `workflow_input_size_bytes` histogram is
328386
> populated at wire time by `ApiClientMetrics`, which avoids serializing the
@@ -627,6 +685,8 @@ public interface TaskRunnerEventsListener {
627685

628686
#### 4. Update ListenerRegister (If Updated Interface)
629687

688+
Use the 3-arg `register` with the listener as the key for idempotent registration:
689+
630690
```java
631691
public static void register(TaskRunnerEventsListener listener,
632692
EventDispatcher<TaskRunnerEvent> dispatcher) {

conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/TaskRunnerEventsListener.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,17 @@
2929

3030
public interface TaskRunnerEventsListener {
3131

32-
void consume(PollFailure e);
32+
default void consume(PollFailure e) {}
3333

34-
void consume(PollCompleted e);
34+
default void consume(PollCompleted e) {}
3535

36-
void consume(PollStarted e);
36+
default void consume(PollStarted e) {}
3737

38-
void consume(TaskExecutionStarted e);
38+
default void consume(TaskExecutionStarted e) {}
3939

40-
void consume(TaskExecutionCompleted e);
40+
default void consume(TaskExecutionCompleted e) {}
4141

42-
void consume(TaskExecutionFailure e);
42+
default void consume(TaskExecutionFailure e) {}
4343

4444
default void consume(TaskUpdateCompleted e) {}
4545

0 commit comments

Comments
 (0)