Skip to content

Commit fc6de75

Browse files
authored
Merge branch 'main' into migrate-to-uv
2 parents fc98eb9 + 9717333 commit fc6de75

9 files changed

Lines changed: 650 additions & 4 deletions

File tree

examples/workflow/README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,3 +508,31 @@ dapr run --app-id wf-versioning-example -- python3 versioning.py part1
508508
dapr run --app-id wf-versioning-example --log-level debug -- python3 versioning.py part2
509509
```
510510
<!--END_STEP-->
511+
512+
### Pydantic models as workflow/activity inputs
513+
514+
This example shows how to pass [Pydantic](https://docs.pydantic.dev/) `BaseModel`
515+
instances directly as workflow and activity inputs. When a workflow or activity
516+
annotates its input parameter with a `BaseModel` subclass, the runtime
517+
reconstructs the model from the decoded JSON payload automatically — no manual
518+
`model_validate` call is needed at the receiving side.
519+
520+
The wire format remains plain JSON, so workflows and activities stay
521+
interop-friendly with non-Python Dapr apps. Outputs coming back from activities
522+
arrive as dicts; reconstructing them into a typed instance is a one-liner
523+
(`OrderResult.model_validate(...)`).
524+
525+
<!--STEP
526+
name: Run the pydantic models example
527+
expected_stdout_lines:
528+
- "[workflow] received order O-100 for Acme amount=42.0"
529+
- "[activity] approving order O-100"
530+
- "[workflow] activity returned approved=True"
531+
- "[client] workflow output: order_id=O-100 approved=True message=auto-approved"
532+
timeout_seconds: 60
533+
-->
534+
535+
```sh
536+
dapr run --app-id wf-pydantic-example -- python3 pydantic_models.py
537+
```
538+
<!--END_STEP-->
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2026 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
"""Native Pydantic model support in Dapr workflows and activities.
13+
14+
Inputs annotated with a Pydantic BaseModel are reconstructed automatically on
15+
the receiving side — no manual serialization is needed. Outputs are emitted
16+
as plain JSON so the wire format stays interop-friendly with non-Python Dapr
17+
apps.
18+
"""
19+
20+
from dapr.ext.workflow import (
21+
DaprWorkflowClient,
22+
DaprWorkflowContext,
23+
WorkflowActivityContext,
24+
WorkflowRuntime,
25+
)
26+
from pydantic import BaseModel
27+
28+
29+
class OrderRequest(BaseModel):
30+
order_id: str
31+
customer: str
32+
amount: float
33+
34+
35+
class OrderResult(BaseModel):
36+
order_id: str
37+
approved: bool
38+
message: str
39+
40+
41+
wfr = WorkflowRuntime()
42+
instance_id = 'pydantic-demo'
43+
44+
45+
@wfr.workflow(name='order_workflow')
46+
def order_workflow(ctx: DaprWorkflowContext, order: OrderRequest):
47+
# `order` arrives as a real OrderRequest instance — the runtime reads the
48+
# annotation and reconstructs the model from the decoded JSON automatically.
49+
if not ctx.is_replaying:
50+
print(
51+
f'[workflow] received order {order.order_id} '
52+
f'for {order.customer} amount={order.amount}',
53+
flush=True,
54+
)
55+
raw = yield ctx.call_activity(approve_order, input=order)
56+
# Activity results come back as a plain dict. One line turns them into a
57+
# typed instance.
58+
result = OrderResult.model_validate(raw)
59+
if not ctx.is_replaying:
60+
print(
61+
f'[workflow] activity returned approved={result.approved}',
62+
flush=True,
63+
)
64+
return result
65+
66+
67+
@wfr.activity(name='approve_order')
68+
def approve_order(ctx: WorkflowActivityContext, order: OrderRequest) -> OrderResult:
69+
# Same story: `order` is already an OrderRequest instance here.
70+
print(f'[activity] approving order {order.order_id}', flush=True)
71+
if order.amount <= 100.0:
72+
return OrderResult(order_id=order.order_id, approved=True, message='auto-approved')
73+
return OrderResult(order_id=order.order_id, approved=False, message='needs review')
74+
75+
76+
def main():
77+
wfr.start()
78+
client = DaprWorkflowClient()
79+
80+
order = OrderRequest(order_id='O-100', customer='Acme', amount=42.0)
81+
client.schedule_new_workflow(workflow=order_workflow, input=order, instance_id=instance_id)
82+
state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
83+
84+
# state.serialized_output is a JSON string — reconstruct a typed instance.
85+
output = OrderResult.model_validate_json(state.serialized_output)
86+
print(
87+
f'[client] workflow output: order_id={output.order_id} '
88+
f'approved={output.approved} message={output.message}',
89+
flush=True,
90+
)
91+
92+
client.purge_workflow(instance_id)
93+
wfr.shutdown()
94+
95+
96+
if __name__ == '__main__':
97+
main()

examples/workflow/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
dapr-ext-workflow>=1.17.0.dev
22
dapr>=1.17.0.dev
3+
pydantic>=2.0

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/shared.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from typing import Any, Optional, Sequence, Union
1818

1919
import grpc
20+
from dapr.ext.workflow import _model_protocol
2021

2122
ClientInterceptor = Union[
2223
grpc.UnaryUnaryClientInterceptor,
@@ -156,6 +157,16 @@ def encode(self, obj: Any) -> str:
156157
return super().encode(obj)
157158

158159
def default(self, obj):
160+
# Dapr-specific: objects implementing the duck-typed model protocol
161+
# (model_dump + model_validate) are emitted as plain JSON objects with
162+
# no AUTO_SERIALIZED marker, so the payload stays readable by
163+
# non-Python SDKs and by workflows/activities that don't import the
164+
# same class. Type-directed reconstruction happens at the
165+
# activity/workflow input boundary in
166+
# dapr.ext.workflow.workflow_runtime. No pydantic dependency — any
167+
# class matching the protocol works (Pydantic v2, SQLModel, custom).
168+
if _model_protocol.is_model(obj):
169+
return _model_protocol.dump_model(obj)
159170
if dataclasses.is_dataclass(obj):
160171
# Dataclasses are not serializable by default, so we convert them to a dict and mark them for
161172
# automatic deserialization by the receiver
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Copyright 2026 The Dapr Authors
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
"""
15+
16+
from __future__ import annotations
17+
18+
import inspect
19+
import typing
20+
from functools import lru_cache
21+
from types import SimpleNamespace
22+
from typing import Any, Callable, Optional
23+
24+
# A "model" here is anything that implements the Pydantic v2 shape:
25+
# - model_dump(self, ...) -> dict
26+
# - cls.model_validate(value) -> instance
27+
# We duck-type on these names rather than importing pydantic so the SDK has no
28+
# hard dependency on pydantic (or any specific version of it). SQLModel,
29+
# FastAPI response models, and custom classes mirroring the protocol all work.
30+
31+
32+
def is_model(obj: Any) -> bool:
33+
"""Whether obj implements the model protocol (model_dump + model_validate)."""
34+
return is_model_class(type(obj))
35+
36+
37+
def is_model_class(cls: Any) -> bool:
38+
"""Whether cls is a class implementing the model protocol."""
39+
return (
40+
inspect.isclass(cls)
41+
and callable(getattr(cls, 'model_dump', None))
42+
and callable(getattr(cls, 'model_validate', None))
43+
)
44+
45+
46+
@lru_cache(maxsize=None)
47+
def _supports_mode_kwarg(cls: type) -> bool:
48+
"""Whether cls.model_dump accepts a `mode` keyword (Pydantic v2 signature)."""
49+
try:
50+
sig = inspect.signature(cls.model_dump)
51+
except (TypeError, ValueError):
52+
return False
53+
params = sig.parameters
54+
if 'mode' in params:
55+
return True
56+
return any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params.values())
57+
58+
59+
def dump_model(model: Any) -> Any:
60+
"""Serialize a model instance to a JSON-compatible primitive graph.
61+
62+
Prefers model_dump(mode='json') when supported so nested datetimes, enums,
63+
and UUIDs render into JSON-safe primitives. Falls back to bare model_dump()
64+
for protocol-compatible classes that don't accept the mode kwarg — those
65+
classes are responsible for returning JSON-safe values themselves.
66+
"""
67+
if not is_model(model):
68+
raise TypeError(
69+
f'Expected a model-like object with model_dump/model_validate, '
70+
f'got {type(model).__name__}'
71+
)
72+
cls = type(model)
73+
if _supports_mode_kwarg(cls):
74+
return model.model_dump(mode='json')
75+
return model.model_dump()
76+
77+
78+
def coerce_to_model(value: Any, cls: type) -> Any:
79+
"""Reconstruct a model instance from a decoded JSON payload.
80+
81+
Accepts dicts, SimpleNamespace (from the InternalJSONDecoder's
82+
AUTO_SERIALIZED path), or already-instantiated models. Any other shape
83+
raises TypeError so the failure surfaces at the activity/workflow
84+
boundary rather than later as an attribute access error.
85+
"""
86+
if not is_model_class(cls):
87+
raise TypeError(f'{cls!r} is not a model class (no model_dump/model_validate)')
88+
if isinstance(value, cls):
89+
return value
90+
if isinstance(value, SimpleNamespace):
91+
value = vars(value)
92+
if isinstance(value, dict):
93+
return cls.model_validate(value)
94+
raise TypeError(
95+
f'Cannot coerce value of type {type(value).__name__} into {cls.__name__}; '
96+
'expected a dict, SimpleNamespace, or existing model instance.'
97+
)
98+
99+
100+
def resolve_input(fn: Callable[..., Any]) -> tuple[bool, Optional[type]]:
101+
"""Inspect fn's input parameter.
102+
103+
Returns (accepts_input, model_class):
104+
- accepts_input is True when fn declares a second positional parameter
105+
(beyond the context) — the runtime must then pass the input through
106+
even when it is None, so `Optional[Model]` works without a default.
107+
- model_class is the model class annotated on that parameter, or None
108+
when there is no annotation or the annotation is not a model.
109+
Optional[Model] and Model | None are unwrapped to Model.
110+
"""
111+
try:
112+
sig = inspect.signature(fn)
113+
except (TypeError, ValueError):
114+
return False, None
115+
116+
params = list(sig.parameters.values())
117+
if len(params) < 2:
118+
return False, None
119+
120+
annotation = params[1].annotation
121+
if annotation is inspect.Parameter.empty:
122+
return True, None
123+
124+
if isinstance(annotation, str):
125+
try:
126+
hints = typing.get_type_hints(fn)
127+
annotation = hints.get(params[1].name, annotation)
128+
except Exception:
129+
return True, None
130+
131+
annotation = _unwrap_optional(annotation)
132+
return True, (annotation if is_model_class(annotation) else None)
133+
134+
135+
def _unwrap_optional(annotation: Any) -> Any:
136+
"""Unwrap Optional[X] / X | None to X. Leaves other annotations unchanged."""
137+
origin = typing.get_origin(annotation)
138+
if origin is typing.Union or _is_pep604_union(origin):
139+
args = [a for a in typing.get_args(annotation) if a is not type(None)]
140+
if len(args) == 1:
141+
return args[0]
142+
return annotation
143+
144+
145+
def _is_pep604_union(origin: Any) -> bool:
146+
try:
147+
from types import UnionType # type: ignore[attr-defined]
148+
149+
return origin is UnionType
150+
except ImportError:
151+
return False

ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
from dapr.conf import settings
3232
from dapr.conf.helpers import GrpcEndpoint
3333

34+
from . import _model_protocol
35+
3436
T = TypeVar('T')
3537
TInput = TypeVar('TInput')
3638
TOutput = TypeVar('TOutput')
@@ -89,15 +91,23 @@ def register_workflow(self, fn: Workflow, *, name: Optional[str] = None):
8991
effective_name = name or fn.__name__
9092
self._logger.info(f"Registering workflow '{effective_name}' with runtime")
9193

94+
accepts_input, input_model = _model_protocol.resolve_input(fn)
95+
9296
def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None):
9397
"""Responsible to call Workflow function in orchestrationWrapper"""
9498
instance_id = getattr(ctx, 'instance_id', 'unknown')
9599

96100
try:
97101
daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options())
98-
if inp is None:
102+
if not accepts_input:
99103
result = fn(daprWfContext)
100104
else:
105+
if (
106+
(inp is not None)
107+
and (input_model is not None)
108+
and not isinstance(inp, input_model)
109+
):
110+
inp = _model_protocol.coerce_to_model(inp, input_model)
101111
result = fn(daprWfContext, inp)
102112
return result
103113
except Exception as e:
@@ -131,11 +141,15 @@ def register_versioned_workflow(
131141
f"Registering version {version_name} of workflow '{effective_name}' with runtime"
132142
)
133143

144+
accepts_input, input_model = _model_protocol.resolve_input(fn)
145+
134146
def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None):
135147
"""Responsible to call Workflow function in orchestrationWrapper"""
136148
daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options())
137-
if inp is None:
149+
if not accepts_input:
138150
return fn(daprWfContext)
151+
if (inp is not None) and (input_model is not None) and not isinstance(inp, input_model):
152+
inp = _model_protocol.coerce_to_model(inp, input_model)
139153
return fn(daprWfContext, inp)
140154

141155
if hasattr(fn, '_workflow_registered'):
@@ -167,15 +181,23 @@ def register_activity(self, fn: Activity, *, name: Optional[str] = None):
167181
effective_name = name or fn.__name__
168182
self._logger.info(f"Registering activity '{effective_name}' with runtime")
169183

184+
accepts_input, input_model = _model_protocol.resolve_input(fn)
185+
170186
def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None):
171187
"""Responsible to call Activity function in activityWrapper"""
172188
activity_id = getattr(ctx, 'task_id', 'unknown')
173189

174190
try:
175191
wfActivityContext = WorkflowActivityContext(ctx)
176-
if inp is None:
192+
if not accepts_input:
177193
result = fn(wfActivityContext)
178194
else:
195+
if (
196+
(inp is not None)
197+
and (input_model is not None)
198+
and not isinstance(inp, input_model)
199+
):
200+
inp = _model_protocol.coerce_to_model(inp, input_model)
179201
result = fn(wfActivityContext, inp)
180202
return result
181203
except Exception as e:

0 commit comments

Comments
 (0)