Skip to content

Commit 144349f

Browse files
Add SDK metrics hooks
1 parent 85ba5b8 commit 144349f

8 files changed

Lines changed: 401 additions & 22 deletions

File tree

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,24 @@ async def main():
5757
- **Polyglot**: Works alongside PHP workers on the same task queue
5858
- **HTTP/JSON protocol**: No gRPC, no protobuf dependencies
5959
- **Codec envelopes**: Avro payloads by default, with JSON decode compatibility for existing history
60+
- **Metrics hooks**: Pluggable counters and histograms, with an optional Prometheus adapter
61+
62+
## Metrics
63+
64+
Pass a recorder to `Client(metrics=...)` or `Worker(metrics=...)` to collect request, poll, and task metrics. The SDK ships a no-op default, an `InMemoryMetrics` recorder for tests or custom exporter loops, and `PrometheusMetrics` for deployments that install the optional extra:
65+
66+
```bash
67+
pip install 'durable-workflow[prometheus]'
68+
```
69+
70+
```python
71+
from durable_workflow import Client, PrometheusMetrics
72+
73+
metrics = PrometheusMetrics()
74+
client = Client("http://server:8080", token="dev-token-123", metrics=metrics)
75+
```
76+
77+
Custom recorders implement `increment(name, value=1.0, tags=None)` and `record(name, value, tags=None)`.
6078

6179
## Documentation
6280

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ dev = [
4242
"mypy>=1.10",
4343
"ruff>=0.4",
4444
]
45+
prometheus = [
46+
"prometheus-client>=0.20",
47+
]
4548

4649
[project.urls]
4750
Homepage = "https://github.com/durable-workflow/sdk-python"

src/durable_workflow/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@
4040
WorkflowNotFound,
4141
WorkflowTerminated,
4242
)
43+
from .metrics import (
44+
InMemoryMetrics,
45+
MetricsRecorder,
46+
NoopMetrics,
47+
PrometheusMetrics,
48+
)
4349
from .worker import Worker
4450
from .workflow import ContinueAsNew, StartChildWorkflow
4551

@@ -71,8 +77,12 @@
7177
"workflow",
7278
"DurableWorkflowError",
7379
"InvalidArgument",
80+
"InMemoryMetrics",
81+
"MetricsRecorder",
7482
"NamespaceNotFound",
83+
"NoopMetrics",
7584
"QueryFailed",
85+
"PrometheusMetrics",
7686
"ServerError",
7787
"Unauthorized",
7888
"UpdateRejected",

src/durable_workflow/client.py

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import time
45
from dataclasses import dataclass
56
from importlib.metadata import PackageNotFoundError
67
from importlib.metadata import version as _pkg_version
@@ -16,6 +17,7 @@
1617
WorkflowTerminated,
1718
_raise_for_status,
1819
)
20+
from .metrics import CLIENT_REQUEST_DURATION_SECONDS, CLIENT_REQUESTS, NOOP_METRICS, MetricsRecorder
1921
from .retry_policy import RetryPolicy
2022

2123
PROTOCOL_VERSION = "1.0"
@@ -34,6 +36,28 @@ def _default_sdk_version() -> str:
3436
DEFAULT_SDK_VERSION = _default_sdk_version()
3537

3638

39+
def _route_for_metrics(path: str) -> str:
40+
clean_path = path.split("?", 1)[0]
41+
parts = [part for part in clean_path.strip("/").split("/") if part]
42+
if not parts:
43+
return "/"
44+
45+
if parts[0] == "workflows" and len(parts) >= 2:
46+
parts[1] = "{workflow_id}"
47+
if len(parts) >= 4 and parts[2] in {"signal", "query", "update"}:
48+
parts[3] = "{name}"
49+
if len(parts) >= 4 and parts[2] == "runs":
50+
parts[3] = "{run_id}"
51+
elif parts[0] == "schedules" and len(parts) >= 2:
52+
parts[1] = "{schedule_id}"
53+
elif (
54+
parts[:2] == ["worker", "workflow-tasks"] or parts[:2] == ["worker", "activity-tasks"]
55+
) and len(parts) >= 3:
56+
parts[2] = "{task_id}"
57+
58+
return "/" + "/".join(parts)
59+
60+
3761
@dataclass
3862
class WorkflowExecution:
3963
workflow_id: str
@@ -250,11 +274,13 @@ def __init__(
250274
namespace: str = "default",
251275
timeout: float = 60.0,
252276
retry_policy: RetryPolicy | None = None,
277+
metrics: MetricsRecorder | None = None,
253278
) -> None:
254279
self.base_url = base_url.rstrip("/")
255280
self.token = token
256281
self.namespace = namespace
257282
self.retry_policy = retry_policy or RetryPolicy()
283+
self.metrics = metrics or NOOP_METRICS
258284
self._http = httpx.AsyncClient(base_url=self.base_url, timeout=timeout)
259285

260286
async def aclose(self) -> None:
@@ -287,6 +313,12 @@ async def _request(
287313
timeout: float | None = None,
288314
context: str = "",
289315
) -> Any:
316+
start = time.perf_counter()
317+
route = _route_for_metrics(path)
318+
plane = "worker" if worker else "control"
319+
status_code = "none"
320+
outcome = "pending"
321+
290322
async def _do_request() -> httpx.Response:
291323
resp = await self._http.request(
292324
method,
@@ -300,19 +332,40 @@ async def _do_request() -> httpx.Response:
300332
return resp
301333

302334
try:
303-
resp = await self.retry_policy.execute(_do_request)
304-
except httpx.HTTPStatusError as exc:
305-
# Convert to our custom exception types
306335
try:
307-
body = exc.response.json()
308-
except ValueError:
309-
body = exc.response.text
310-
_raise_for_status(exc.response.status_code, body, context=context)
311-
raise # unreachable, but keeps type checker happy
312-
313-
if resp.status_code == 204 or not resp.content:
314-
return None
315-
return resp.json()
336+
resp = await self.retry_policy.execute(_do_request)
337+
except httpx.HTTPStatusError as exc:
338+
status_code = str(exc.response.status_code)
339+
outcome = "http_error"
340+
# Convert to our custom exception types
341+
try:
342+
body = exc.response.json()
343+
except ValueError:
344+
body = exc.response.text
345+
_raise_for_status(exc.response.status_code, body, context=context)
346+
raise # unreachable, but keeps type checker happy
347+
348+
status_code = str(resp.status_code)
349+
if resp.status_code == 204 or not resp.content:
350+
outcome = "ok"
351+
return None
352+
result = resp.json()
353+
outcome = "ok"
354+
return result
355+
except Exception as exc:
356+
if outcome == "pending":
357+
outcome = type(exc).__name__
358+
raise
359+
finally:
360+
tags = {
361+
"method": method.upper(),
362+
"route": route,
363+
"plane": plane,
364+
"status_code": status_code,
365+
"outcome": outcome,
366+
}
367+
self.metrics.increment(CLIENT_REQUESTS, tags=tags)
368+
self.metrics.record(CLIENT_REQUEST_DURATION_SECONDS, time.perf_counter() - start, tags=tags)
316369

317370
async def get_cluster_info(self) -> dict[str, Any]:
318371
"""Fetch server build identity, capabilities, and protocol manifests."""

src/durable_workflow/metrics.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
from __future__ import annotations
2+
3+
from collections.abc import Mapping
4+
from dataclasses import dataclass, field
5+
from importlib import import_module
6+
from typing import Any, Protocol, TypeAlias
7+
8+
MetricTags: TypeAlias = Mapping[str, str]
9+
MetricKey: TypeAlias = tuple[str, tuple[tuple[str, str], ...]]
10+
11+
CLIENT_REQUESTS = "durable_workflow_client_requests"
12+
CLIENT_REQUEST_DURATION_SECONDS = "durable_workflow_client_request_duration_seconds"
13+
WORKER_POLLS = "durable_workflow_worker_polls"
14+
WORKER_POLL_DURATION_SECONDS = "durable_workflow_worker_poll_duration_seconds"
15+
WORKER_TASKS = "durable_workflow_worker_tasks"
16+
WORKER_TASK_DURATION_SECONDS = "durable_workflow_worker_task_duration_seconds"
17+
18+
19+
class MetricsRecorder(Protocol):
20+
"""Pluggable counter and histogram recorder used by the client and worker."""
21+
22+
def increment(self, name: str, value: float = 1.0, tags: MetricTags | None = None) -> None:
23+
"""Increment a counter metric."""
24+
25+
def record(self, name: str, value: float, tags: MetricTags | None = None) -> None:
26+
"""Record a histogram/sample metric."""
27+
28+
29+
class NoopMetrics:
30+
"""Default metrics recorder that intentionally drops all observations."""
31+
32+
def increment(self, name: str, value: float = 1.0, tags: MetricTags | None = None) -> None:
33+
pass
34+
35+
def record(self, name: str, value: float, tags: MetricTags | None = None) -> None:
36+
pass
37+
38+
39+
NOOP_METRICS = NoopMetrics()
40+
41+
42+
def _metric_key(name: str, tags: MetricTags | None) -> MetricKey:
43+
return name, tuple(sorted((str(k), str(v)) for k, v in (tags or {}).items()))
44+
45+
46+
@dataclass
47+
class InMemoryMetrics:
48+
"""Simple recorder useful for tests and custom exporter loops."""
49+
50+
counters: dict[MetricKey, float] = field(default_factory=dict)
51+
histograms: dict[MetricKey, list[float]] = field(default_factory=dict)
52+
53+
def increment(self, name: str, value: float = 1.0, tags: MetricTags | None = None) -> None:
54+
key = _metric_key(name, tags)
55+
self.counters[key] = self.counters.get(key, 0.0) + value
56+
57+
def record(self, name: str, value: float, tags: MetricTags | None = None) -> None:
58+
self.histograms.setdefault(_metric_key(name, tags), []).append(value)
59+
60+
def counter_value(self, name: str, tags: MetricTags | None = None) -> float:
61+
return self.counters.get(_metric_key(name, tags), 0.0)
62+
63+
def observations(self, name: str, tags: MetricTags | None = None) -> list[float]:
64+
return list(self.histograms.get(_metric_key(name, tags), []))
65+
66+
67+
class PrometheusMetrics:
68+
"""Metrics recorder backed by the optional prometheus-client package."""
69+
70+
def __init__(self, *, registry: Any | None = None) -> None:
71+
try:
72+
prometheus_client = import_module("prometheus_client")
73+
except ImportError as exc:
74+
raise RuntimeError(
75+
"PrometheusMetrics requires prometheus-client. "
76+
"Install it with `pip install durable-workflow[prometheus]`."
77+
) from exc
78+
79+
self._counter_cls: Any = prometheus_client.Counter
80+
self._histogram_cls: Any = prometheus_client.Histogram
81+
self._registry = registry
82+
self._counters: dict[str, Any] = {}
83+
self._histograms: dict[str, Any] = {}
84+
self._label_names: dict[tuple[str, str], tuple[str, ...]] = {}
85+
86+
def increment(self, name: str, value: float = 1.0, tags: MetricTags | None = None) -> None:
87+
tag_values = dict(_metric_key(name, tags)[1])
88+
counter = self._metric("counter", name, tuple(tag_values))
89+
if tag_values:
90+
counter.labels(**tag_values).inc(value)
91+
else:
92+
counter.inc(value)
93+
94+
def record(self, name: str, value: float, tags: MetricTags | None = None) -> None:
95+
tag_values = dict(_metric_key(name, tags)[1])
96+
histogram = self._metric("histogram", name, tuple(tag_values))
97+
if tag_values:
98+
histogram.labels(**tag_values).observe(value)
99+
else:
100+
histogram.observe(value)
101+
102+
def _metric(self, kind: str, name: str, label_names: tuple[str, ...]) -> Any:
103+
key = (kind, name)
104+
existing = self._label_names.get(key)
105+
if existing is not None and existing != label_names:
106+
raise ValueError(
107+
f"metric {name!r} was already registered as a {kind} "
108+
f"with labels {existing!r}; got {label_names!r}"
109+
)
110+
self._label_names[key] = label_names
111+
112+
store = self._counters if kind == "counter" else self._histograms
113+
if name in store:
114+
return store[name]
115+
116+
kwargs: dict[str, Any] = {}
117+
if self._registry is not None:
118+
kwargs["registry"] = self._registry
119+
metric_cls = self._counter_cls if kind == "counter" else self._histogram_cls
120+
metric = metric_cls(name, f"{name} {kind}", label_names, **kwargs)
121+
store[name] = metric
122+
return metric

src/durable_workflow/sync.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
WorkflowHandle,
1818
WorkflowList,
1919
)
20+
from .metrics import MetricsRecorder
21+
from .retry_policy import RetryPolicy
2022

2123

2224
def _run(coro: Any) -> Any:
@@ -139,8 +141,17 @@ def __init__(
139141
token: str | None = None,
140142
namespace: str = "default",
141143
timeout: float = 60.0,
144+
retry_policy: RetryPolicy | None = None,
145+
metrics: MetricsRecorder | None = None,
142146
) -> None:
143-
self._async = AsyncClient(base_url, token=token, namespace=namespace, timeout=timeout)
147+
self._async = AsyncClient(
148+
base_url,
149+
token=token,
150+
namespace=namespace,
151+
timeout=timeout,
152+
retry_policy=retry_policy,
153+
metrics=metrics,
154+
)
144155

145156
def close(self) -> None:
146157
_run(self._async.aclose())

0 commit comments

Comments
 (0)