Skip to content

Commit b271697

Browse files
Merge pull request #114 from conductor-oss/gated-metrics-standardization
Standardized metrics export
2 parents 79e4c72 + 0499a8b commit b271697

40 files changed

Lines changed: 3069 additions & 191 deletions

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@ bin/
99
.project
1010
.settings/
1111
.factorypath
12+
*.db
13+
*.db-shm
14+
*.db-wal

CHANGELOG.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,24 @@
22

33
All notable changes to this project will be documented in this file.
44

5+
## [5.1.0]
6+
7+
### Added
8+
9+
- Standardized Prometheus metrics: `PrometheusMetricsCollector` now emits the harmonized cross-SDK metric surface — [details](conductor-client-metrics/README.md)
10+
- Automatic metrics wiring: `ConductorClient.Builder.withMetricsCollector(...)` installs the HTTP interceptor and auto-registers listeners on `TaskClient`, `WorkflowClient`, and `TaskRunnerConfigurer`
11+
- HTTP API client metrics via OkHttp interceptor (`http_api_client_request_seconds`, `task_result_size_bytes`, `workflow_input_size_bytes`)
12+
- Event-driven metrics architecture with `EventDispatcher` and typed event POJOs
13+
14+
### Changed
15+
16+
- `PrometheusMetricsCollector` metric names updated to the harmonized cross-SDK catalog (e.g. `task_poll_total`, `task_execute_time_seconds`)
17+
- `micrometer-registry-prometheus` is now a transitive (`api`) dependency
18+
19+
### Deprecated
20+
21+
- `TaskClient.ack(String, String)` — use `ack(String taskType, String taskId, String workerId)`
22+
523
## [4.0.0] - 2024-10-09
624
- New major release – [Read more](https://orkes.io/blog/conductor-java-client-v4/)
725

INTERCEPTOR.md

Lines changed: 154 additions & 70 deletions
Large diffs are not rendered by default.

README.md

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -298,24 +298,37 @@ executor.initWorkers("com.mycompany.workers"); // Package to scan for @WorkerTa
298298
299299
## Monitoring Workers
300300
301-
Enable metrics collection for monitoring workers:
301+
Enable Prometheus metrics collection for monitoring workers:
302302
303-
```java
303+
```groovy
304304
// Using conductor-client-metrics module
305305
dependencies {
306-
implementation 'org.conductoross:conductor-client-metrics:4.0.1'
306+
implementation 'org.conductoross:conductor-client-metrics:5.1.0'
307307
}
308308
```
309309
310310
```java
311-
// Configure metrics with Prometheus
311+
import com.netflix.conductor.client.metrics.prometheus.PrometheusMetricsCollector;
312+
313+
PrometheusMetricsCollector metricsCollector = new PrometheusMetricsCollector();
314+
metricsCollector.startServer(); // http://localhost:9991/metrics
315+
316+
ConductorClient client = ConductorClient.builder()
317+
.basePath("...")
318+
.withMetricsCollector(metricsCollector)
319+
.build();
320+
321+
TaskClient taskClient = new TaskClient(client);
322+
WorkflowClient workflowClient = new WorkflowClient(client);
323+
312324
TaskRunnerConfigurer configurer = new TaskRunnerConfigurer.Builder(taskClient, workers)
313325
.withThreadCount(10)
314-
.withMetricsCollector(new PrometheusMetricsCollector())
315326
.build();
316327
```
317328
318-
See [conductor-client-metrics/README.md](conductor-client-metrics/README.md) for full metrics documentation.
329+
When a `MetricsCollector` is attached to the `ConductorClient`, downstream clients (`TaskClient`, `WorkflowClient`, `TaskRunnerConfigurer`) automatically register themselves as event listeners.
330+
331+
See [conductor-client-metrics/README.md](conductor-client-metrics/README.md) for the complete metric catalog and setup details.
319332
320333
## Workflows
321334

conductor-client-metrics/README.md

Lines changed: 179 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,184 @@
11
# Conductor Client Metrics
22

3-
**Status: Incubating.**
3+
The `conductor-client-metrics` module provides Prometheus metrics for Java SDK clients and workers. It helps operators monitor worker polling, task execution, task result updates, payload sizes, workflow starts, and HTTP client latency.
44

5-
Provides metrics and monitoring capabilities for Conductor clients.
5+
## Installation
66

7-
It helps developers track the performance and health of their workers, offering insights into task execution times, error rates, and system throughput.
7+
Add the metrics module to the worker application:
88

9-
As an incubating module, it's still under development and subject to changes.
9+
```groovy
10+
dependencies {
11+
implementation 'org.conductoross:conductor-client-metrics:5.1.0'
12+
}
13+
```
14+
15+
## Usage
16+
17+
Create a `PrometheusMetricsCollector`, start the scrape server, and pass the collector to `ConductorClient.Builder`. All downstream clients and the task runner auto-register themselves as listeners.
18+
19+
```java
20+
import com.netflix.conductor.client.metrics.prometheus.PrometheusMetricsCollector;
21+
22+
PrometheusMetricsCollector metricsCollector = new PrometheusMetricsCollector();
23+
metricsCollector.startServer(); // http://localhost:9991/metrics
24+
25+
ConductorClient client = ConductorClient.builder()
26+
.basePath("http://conductor-server:8080/api")
27+
.withMetricsCollector(metricsCollector)
28+
.build();
29+
30+
TaskClient taskClient = new TaskClient(client);
31+
WorkflowClient workflowClient = new WorkflowClient(client);
32+
33+
TaskRunnerConfigurer configurer = new TaskRunnerConfigurer.Builder(taskClient, workers)
34+
.withThreadCount(10)
35+
.build();
36+
configurer.init();
37+
```
38+
39+
`startServer()` also accepts `(port, endpoint)` for custom scrape configurations. The client builder accepts the usual timeouts, SSL, authentication, and other options alongside `withMetricsCollector` -- none of them change how metrics wiring works.
40+
41+
### How Auto-Registration Works
42+
43+
When a `MetricsCollector` is passed to `ConductorClient.Builder.withMetricsCollector()`:
44+
45+
1. The `ConductorClient` installs an OkHttp interceptor that records `http_api_client_request_seconds`, `task_result_size_bytes`, and `workflow_input_size_bytes`.
46+
2. `TaskClient` detects the collector from the `ConductorClient` it receives and calls `registerListener` and `registerTaskRunnerListener` on itself.
47+
3. `WorkflowClient` detects the collector from the `ConductorClient` it receives and calls `registerListener` on itself.
48+
4. `TaskRunnerConfigurer.Builder.build()` detects the collector from the `TaskClient`'s `ConductorClient` and registers task-runner events automatically, unless `withMetricsCollector` was called explicitly on the builder.
49+
50+
All registrations are idempotent. If you call both `withMetricsCollector` on the builder and `registerListener` manually with the same collector, events are not duplicated.
51+
52+
The collector exposes Prometheus text format from the embedded HTTP server. Metrics are created lazily, so a metric family appears after the corresponding worker or client event has occurred.
53+
54+
### Manual Wiring
55+
56+
For advanced use cases where you need fine-grained control over which listeners are registered where, or you want to mix the metrics collector with custom event listeners, create the `ConductorClient` without `withMetricsCollector` and register listeners explicitly:
57+
58+
```java
59+
import com.netflix.conductor.client.metrics.prometheus.PrometheusMetricsCollector;
60+
61+
PrometheusMetricsCollector metricsCollector = new PrometheusMetricsCollector();
62+
metricsCollector.startServer(); // http://localhost:9991/metrics
63+
64+
ConductorClient client = ConductorClient.builder()
65+
.basePath("http://conductor-server:8080/api")
66+
.build();
67+
68+
TaskClient taskClient = new TaskClient(client);
69+
taskClient.registerListener(metricsCollector);
70+
taskClient.registerTaskRunnerListener(metricsCollector);
71+
72+
TaskRunnerConfigurer configurer = new TaskRunnerConfigurer.Builder(taskClient, workers)
73+
.withThreadCount(10)
74+
.withMetricsCollector(metricsCollector)
75+
.build();
76+
77+
configurer.init();
78+
79+
WorkflowClient workflowClient = new WorkflowClient(client);
80+
workflowClient.registerListener(metricsCollector);
81+
```
82+
83+
Note that manual wiring does not install the OkHttp interceptor for `http_api_client_request_seconds`, `task_result_size_bytes`, or `workflow_input_size_bytes`. Use `withMetricsCollector` on the builder for those metrics.
84+
85+
### Event Dispatch Threading
86+
87+
Events are dispatched asynchronously on a single shared daemon thread (`conductor-event-dispatch`). This avoids contention with the application's `ForkJoinPool.commonPool()`. Metrics collector listeners (counter increments, timer recordings) are lock-free and sub-microsecond, so the single thread keeps up under normal load. Custom listeners registered via `EventDispatcher.register()` must be non-blocking; a slow listener will delay delivery of all events across the process.
88+
89+
## Metrics Catalog
90+
91+
Time metrics use seconds and standard bucket boundaries. Size metrics use bytes and standard size bucket boundaries. Exception labels use bounded exception type names, not exception messages or stack traces.
92+
93+
### Counters
94+
95+
| Meter | Labels | Meaning |
96+
|---|---|---|
97+
| `task_poll_total` | `taskType` | Incremented each time a worker issues a poll request. |
98+
| `task_execution_started_total` | `taskType` | Incremented when a polled task is dispatched to the worker function. |
99+
| `task_poll_error_total` | `taskType`, `exception` | Incremented when polling fails with a client-side exception. |
100+
| `task_execute_error_total` | `taskType`, `exception` | Incremented when worker code throws while executing a task. |
101+
| `task_update_error_total` | `taskType`, `exception` | Incremented when reporting a task result back to Conductor fails. |
102+
| `task_ack_failed_total` | `taskType` | Incremented when an explicit task ack response is unsuccessful. The internal task runner uses batch poll responses as ack and may not emit this during normal polling. |
103+
| `task_ack_error_total` | `taskType`, `exception` | Incremented when an explicit task ack call throws. The internal task runner uses batch poll responses as ack and may not emit this during normal polling. |
104+
| `task_execution_queue_full_total` | `taskType` | Incremented when a poll cycle is skipped because all worker threads are busy (zero permits available). |
105+
| `task_paused_total` | `taskType` | Incremented when a worker is paused and skips acting on a poll. |
106+
| `thread_uncaught_exceptions_total` | `exception` | Incremented when a worker thread raises an uncaught exception. |
107+
| `external_payload_used_total` | `entityName`, `operation`, `payloadType` | Incremented when external payload storage is used for task or workflow payloads. |
108+
| `workflow_start_error_total` | `workflowType`, `exception` | Incremented when starting a workflow fails client-side. |
109+
110+
### Time Metrics
111+
112+
| Meter | Labels | Meaning |
113+
|---|---|---|
114+
| `task_poll_time_seconds` | `taskType`, `status` | Poll request latency. `status` is `SUCCESS` or `FAILURE`. |
115+
| `task_execute_time_seconds` | `taskType`, `status` | Worker function execution latency. `status` is `SUCCESS` or `FAILURE`. |
116+
| `task_update_time_seconds` | `taskType`, `status` | Latency for reporting a task result back to Conductor. `status` is `SUCCESS` or `FAILURE`. |
117+
| `http_api_client_request_seconds` | `method`, `uri`, `status` | Latency of HTTP requests made by the API client. `status` is the HTTP status code as a string, or `0` when no response status is available. |
118+
119+
Time metrics use these service-level objective buckets, in seconds:
120+
121+
```text
122+
0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.250, 0.500, 1, 2.5, 5, 10
123+
```
124+
125+
The `uri` label for `http_api_client_request_seconds` uses the path template (e.g. `/workflow/{workflowId}`, `/tasks/poll/batch/{taskType}`) rather than the resolved path. This keeps the label space bounded regardless of how many unique workflow or task IDs are processed.
126+
127+
### Size Metrics
128+
129+
| Meter | Labels | Meaning |
130+
|---|---|---|
131+
| `task_result_size_bytes` | `taskType` | Serialized task result output size, captured from `RequestBody.contentLength()` of the outbound `POST /tasks` (or `POST /tasks/update-v2`) request. `taskType` is empty when the caller used the single-argument `TaskClient.updateTask(TaskResult)` overload. |
132+
| `workflow_input_size_bytes` | `workflowType`, `version` | Serialized workflow input size, captured from `RequestBody.contentLength()` of the outbound `POST /workflow` request. `version` is an empty string when the workflow version is absent. |
133+
134+
Both histograms are populated at wire time by the `ApiClientMetrics` OkHttp interceptor, reading a `PayloadKind` tag attached by `TaskClient`/`WorkflowClient`. The byte count is read off the request body the HTTP layer is about to send, so no extra JSON serialization is needed.
135+
136+
Size metrics use these service-level objective buckets, in bytes:
137+
138+
```text
139+
100, 1000, 10000, 100000, 1000000, 10000000
140+
```
141+
142+
### Gauges
143+
144+
| Meter | Labels | Meaning |
145+
|---|---|---|
146+
| `active_workers` | `taskType` | Current number of worker threads actively executing tasks. |
147+
148+
### Micrometer `_max` Sidecars
149+
150+
Micrometer publishes a `*_max` Gauge alongside every Timer and DistributionSummary. These appear in scrape output as e.g. `task_poll_time_seconds_max`, `task_result_size_bytes_max`. The `_max` tracks the maximum observed value within the current reporting interval. This is a Micrometer artifact, not part of the metric catalog; it is harmless and can be ignored by dashboards that don't use it.
151+
152+
## Labels
153+
154+
| Label | Used by | Values |
155+
|---|---|---|
156+
| `taskType` | Worker metrics | Task definition name. |
157+
| `workflowType` | Workflow metrics | Workflow definition name. |
158+
| `version` | `workflow_input_size_bytes` | Workflow version as a string. Empty string when the version is absent. |
159+
| `status` | Task time metrics | `SUCCESS` or `FAILURE`. For `http_api_client_request_seconds`, the HTTP status code as a string, or `0` when no response status is available. |
160+
| `exception` | Error counters | Exception type name, such as `SocketTimeoutException`. |
161+
| `entityName` | `external_payload_used_total` | Task type or workflow name associated with the external payload. |
162+
| `operation` | `external_payload_used_total` | External payload operation, such as `READ` or `WRITE`. |
163+
| `payloadType` | `external_payload_used_total` | Payload type, such as `TASK_INPUT`, `TASK_OUTPUT`, `WORKFLOW_INPUT`, or `WORKFLOW_OUTPUT`. |
164+
| `method` | HTTP metrics | HTTP verb. |
165+
| `uri` | HTTP metrics | Path template from the Java HTTP client (e.g. `/workflow/{workflowId}`). Resolved identifiers are not included, keeping cardinality bounded. |
166+
167+
## Troubleshooting
168+
169+
### Metrics Are Empty
170+
171+
- Verify that the collector is wired into the client. The simplest check: was `withMetricsCollector` called on `ConductorClient.Builder`?
172+
- Verify workers have polled or executed tasks. Metrics are created lazily when the relevant event occurs.
173+
- Confirm the scrape endpoint is reachable at the expected host and port.
174+
175+
### Missing HTTP or Size Metrics
176+
177+
- `http_api_client_request_seconds` requires the HTTP interceptor, which is installed automatically when `withMetricsCollector` is called on the builder.
178+
- `task_result_size_bytes` and `workflow_input_size_bytes` likewise require the HTTP interceptor -- they are recorded at wire time from `RequestBody.contentLength()` for requests tagged with a `PayloadKind`.
179+
- `task_ack_failed_total` and `task_ack_error_total` require `taskClient.registerTaskRunnerListener(metricsCollector)`. This is automatic when using `withMetricsCollector` on the builder.
180+
181+
### High Cardinality
182+
183+
- The `uri` label on `http_api_client_request_seconds` uses the path template, so it is bounded by the number of distinct API endpoints (not by request volume or unique IDs). The interceptor falls back to the resolved path for requests that are not tagged with a template, which may be unbounded.
184+
- Avoid embedding user identifiers or unbounded values in task type, workflow type, or external payload labels.

conductor-client-metrics/build.gradle

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
plugins {
22
id 'java-library'
33
id 'idea'
4-
// id 'maven-publish'
5-
// id 'signing'
4+
id 'maven-publish'
5+
id 'signing'
66
}
77

8-
//ext {
9-
// artifactName = 'Conductor Client Metrics'
10-
// artifactDescription = 'Conductor Client Metrics'
11-
//}
12-
//
13-
//apply plugin: 'publish-config'
8+
ext {
9+
artifactName = 'Conductor Client Metrics'
10+
artifactDescription = 'Conductor Client Metrics'
11+
}
12+
13+
apply plugin: 'publish-config'
1414

1515
dependencies {
16-
implementation 'io.micrometer:micrometer-registry-prometheus:1.15.1'
16+
api 'io.micrometer:micrometer-registry-prometheus:1.15.1'
1717
implementation project(":conductor-client")
18+
implementation "com.squareup.okhttp3:okhttp:${versions.okHttp}"
1819

1920
testImplementation 'org.mockito:mockito-core:5.12.0'
2021
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.3'
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2026 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.client.metrics.prometheus;
14+
15+
import java.time.Duration;
16+
17+
import com.netflix.conductor.client.metrics.ApiClientMetrics;
18+
19+
import io.micrometer.core.instrument.DistributionSummary;
20+
import io.micrometer.core.instrument.Timer;
21+
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
22+
23+
/**
24+
* Prometheus-backed implementation of {@link ApiClientMetrics} that emits the
25+
* canonical {@code http_api_client_request_seconds},
26+
* {@code task_result_size_bytes}, and {@code workflow_input_size_bytes}
27+
* histograms.
28+
*/
29+
public final class PrometheusApiClientMetrics implements ApiClientMetrics {
30+
31+
private static final Duration[] CANONICAL_TIME_BUCKETS = {
32+
Duration.ofMillis(1),
33+
Duration.ofMillis(5),
34+
Duration.ofMillis(10),
35+
Duration.ofMillis(25),
36+
Duration.ofMillis(50),
37+
Duration.ofMillis(100),
38+
Duration.ofMillis(250),
39+
Duration.ofMillis(500),
40+
Duration.ofSeconds(1),
41+
Duration.ofMillis(2500),
42+
Duration.ofSeconds(5),
43+
Duration.ofSeconds(10),
44+
};
45+
46+
private static final double[] CANONICAL_SIZE_BUCKETS = {
47+
100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000
48+
};
49+
50+
private final PrometheusMeterRegistry registry;
51+
52+
public PrometheusApiClientMetrics(PrometheusMeterRegistry registry) {
53+
this.registry = registry;
54+
}
55+
56+
@Override
57+
public void recordRequest(String method, String uri, int statusCode, Duration duration) {
58+
String statusLabel = statusCode <= 0 ? "0" : Integer.toString(statusCode);
59+
Timer.builder("http_api_client_request_seconds")
60+
.description("HTTP API client request latency in seconds")
61+
.tag("method", nullToEmpty(method))
62+
.tag("uri", nullToEmpty(uri))
63+
.tag("status", statusLabel)
64+
.publishPercentileHistogram(false)
65+
.serviceLevelObjectives(CANONICAL_TIME_BUCKETS)
66+
.register(registry)
67+
.record(duration);
68+
}
69+
70+
@Override
71+
public void recordTaskResultSize(String taskType, long sizeBytes) {
72+
if (sizeBytes < 0) {
73+
return;
74+
}
75+
DistributionSummary.builder("task_result_size_bytes")
76+
.description("Records output payload size of a task in bytes")
77+
.tag("taskType", nullToEmpty(taskType))
78+
.serviceLevelObjectives(CANONICAL_SIZE_BUCKETS)
79+
.register(registry)
80+
.record(sizeBytes);
81+
}
82+
83+
@Override
84+
public void recordWorkflowInputSize(String workflowType, Integer version, long sizeBytes) {
85+
if (sizeBytes < 0) {
86+
return;
87+
}
88+
DistributionSummary.builder("workflow_input_size_bytes")
89+
.description("Records input payload size of a workflow in bytes")
90+
.tag("workflowType", nullToEmpty(workflowType))
91+
.tag("version", version == null ? "" : version.toString())
92+
.serviceLevelObjectives(CANONICAL_SIZE_BUCKETS)
93+
.register(registry)
94+
.record(sizeBytes);
95+
}
96+
97+
private static String nullToEmpty(String s) {
98+
return s == null ? "" : s;
99+
}
100+
}

0 commit comments

Comments
 (0)