Skip to content

Commit f6bfd44

Browse files
andystaplesCopilot
andauthored
Fix custom serialization gaps from #154 (#161)
* Fix custom serialization gaps from #154 Close several round-tripping gaps left by the type-aware custom serialization work in #154, without introducing new breaking changes versus 1.5.0 or any serialization-related security concerns. Serialize side: - Prefer a to_json() hook over the built-in dataclass / SimpleNamespace handling so a dataclass (or namespace) with a non-serializable field can opt in, mirroring the decode side which already prefers from_json(). - Encode dataclasses via a shallow field mapping instead of dataclasses.asdict(), so nested to_json() hooks are honored and leaf values are not deep-copied. - Serialize enum.Enum values to their underlying .value so non-int enums round-trip (IntEnum already serialized as integers). Deserialize side: - Recurse type-directed reconstruction into dict/Mapping values and tuple elements, in addition to the existing list / Optional / Union / dataclass recursion. - Optionally pass the active DataConverter to a from_json(cls, value, converter) hook so it can rebuild nested typed values the built-in recursion does not cover. Entity state: - Defer deserialization of an entity's wire state until get_state() is called, so the caller's requested type reaches the converter together with the raw payload. Track whether the held value is still the raw serialized string and pass it back through unchanged on persist to avoid double-encoding. - Replace a redundant serialize/deserialize round-trip in the legacy entity event path with converter.coerce(). Module structure / deprecation: - Merge the internal json_codec module into durabletask.serialization and make the codec functions private; the supported surface is the pluggable DataConverter. - Deprecate durabletask.internal.shared.to_json / from_json with a DeprecationWarning; they continue to work for backwards compatibility. Adds a comprehensive JsonDataConverter round-trip test suite plus targeted tests for each fix, and documents intentional limitations (multi-member Union, types needing a custom converter such as datetime/Decimal/set). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address PR feedback * Fix annotation discovery in 3.10 * Add pydantic example, fix reconstructibility concern * More fixes: - Rename is_reconstructible to can_reconstruct - Correct ownership of _can_reconstruct - Required DataConverter for internal classes * CHANGELOG summarization * No more silent fallbacks to JsonDataConverter * PR feedback * Final CHANGELOG tuneups --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 532479d commit f6bfd44

29 files changed

Lines changed: 2560 additions & 516 deletions

CHANGELOG.md

Lines changed: 48 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -12,54 +12,44 @@ ADDED
1212
- Added a pluggable `DataConverter` (`durabletask.serialization`) accepted by
1313
`TaskHubGrpcWorker`, `TaskHubGrpcClient`, and `AsyncTaskHubGrpcClient` via a
1414
`data_converter` argument. Every payload boundary (inputs, outputs, events,
15-
custom status, entity state) routes through it. The default
15+
custom status, entity state) routes through it, so one converter controls how
16+
Python values become JSON and how they are reconstructed. The default
1617
`JsonDataConverter` preserves existing behavior, so a custom converter (for
17-
example one backed by pydantic) is opt-in. Custom objects can opt in via a
18-
`to_json()` hook and a `from_json(value)` classmethod.
19-
- `OrchestrationContext.call_activity`, `call_sub_orchestrator`, and
20-
`call_entity` accept an optional `return_type`, and `wait_for_external_event`
21-
accepts an optional `data_type`. When provided, the result/event payload is
22-
reconstructed as that type (dataclasses — including nested dataclass,
23-
`Optional`, and `list` fields — and `from_json()`-capable types) and the
24-
returned task is typed accordingly (e.g. `call_activity(..., return_type=Foo)`
25-
yields `CompletableTask[Foo]`). When omitted, the raw deserialized JSON is
26-
returned as before.
27-
- Inbound payloads are reconstructed from function type annotations. When an
28-
orchestrator, activity, or entity operation annotates its input parameter (or
29-
an activity its return value) with a dataclass or `from_json()`-capable type,
30-
the payload is reconstructed as that type. Builtins and unannotated/unknown
31-
types are passed through unchanged. An explicit `return_type` takes precedence
32-
over a discovered annotation.
33-
- Added typed accessors to `client.OrchestrationState`: `get_input()`,
34-
`get_output()`, and `get_custom_status()` each accept an optional
35-
`expected_type` and deserialize the corresponding payload, reconstructing
36-
dataclasses and `from_json()`-capable types. The raw `serialized_*` fields are
37-
retained.
38-
- Objects exposing a `to_json()` method are now JSON-serializable when passed as
39-
activity/orchestrator inputs or outputs.
40-
- Added `EntityMetadata.get_typed_state(intended_type=...)`, which deserializes
41-
the entity's persisted state and reconstructs dataclasses and
42-
`from_json()`-capable types. The existing `get_state()` is unchanged: with no
43-
argument it returns the raw serialized JSON payload, and `get_state(some_type)`
44-
applies constructor-based coercion (`some_type(raw)`).
45-
- Entity runtime state retrieval (`EntityContext.get_state(intended_type=...)` /
46-
`DurableEntity.get_state(...)`) now also reconstructs dataclasses and
47-
`from_json()`-capable types, in addition to the existing constructor-based
48-
coercion.
18+
example one backed by pydantic) is fully opt-in.
19+
- Custom objects can participate in serialization by exposing a `to_json()`
20+
method and a `from_json(value)` classmethod. Both are honored recursively, so
21+
nested custom objects round-trip through their own hooks.
22+
- Payloads are reconstructed into a caller-supplied type — dataclasses
23+
(including nested fields), `from_json()`-capable types, and `enum.Enum`
24+
members, recursing through `list`, `dict`, `tuple`, and `Optional`/`Union`
25+
hints. The type comes from a function's annotations, from an explicit
26+
`return_type` on `call_activity` / `call_sub_orchestrator` / `call_entity`
27+
(or `data_type` on `wait_for_external_event`), or from the typed accessors
28+
`get_input()` / `get_output()` / `get_custom_status()` on
29+
`client.OrchestrationState` and `EntityMetadata.get_typed_state(...)`. It is
30+
never inferred from the payload. Which annotated types are eligible is decided
31+
by the converter via the overridable `DataConverter.can_reconstruct(...)`; a
32+
custom converter can override it to recognize its own types (for example
33+
`pydantic.BaseModel` subclasses).
4934

5035
CHANGED
5136

5237
- Custom objects (dataclasses, `SimpleNamespace`, namedtuples) are now
5338
serialized as plain JSON. Decoding such a payload *without* a type hint now
5439
yields a plain `dict` (previously a `SimpleNamespace`; a namedtuple now
55-
round-trips as a JSON array). To get the original type back, pass the new
56-
`return_type` / `data_type` arguments, annotate the consuming function's
57-
parameter or return type, or use the typed client accessors. Payloads produced
58-
by older SDK versions still deserialize — including into a `SimpleNamespace`
59-
when no type is supplied — so in-flight orchestrations continue to replay
60-
across an upgrade.
40+
round-trips as a JSON array). To get the original type back, supply a type via
41+
one of the mechanisms above. Payloads produced by older SDK versions still
42+
deserialize — including into a `SimpleNamespace` when no type is supplied — so
43+
in-flight orchestrations continue to replay across an upgrade.
6144
- JSON serialization failures now raise a `TypeError` that chains the original
6245
error (`__cause__`) and names the offending type.
46+
- `EntityContext.get_state()` / `DurableEntity.get_state()` now return a freshly
47+
reconstructed value on every call rather than a reference to a single cached
48+
object. This changes v1.6.0 behavior: mutating the returned value in place no
49+
longer affects persisted state — write it back with `set_state()`. State is
50+
also serialized eagerly at `set_state()` time, so a non-serializable value
51+
fails inside the operation (which rolls back) instead of after the batch has
52+
run.
6353

6454
FIXED
6555

@@ -68,19 +58,31 @@ FIXED
6858
"no state" and written as `None`, effectively deleting it; only an actual
6959
`None` state now clears the persisted entity state.
7060

71-
BREAKING CHANGES (type-level only — no runtime impact for typical users)
61+
DEPRECATED
62+
63+
- `durabletask.internal.shared.to_json` and `durabletask.internal.shared.from_json`
64+
are deprecated and now emit a `DeprecationWarning`. Use a
65+
`durabletask.serialization.DataConverter` (for example the default
66+
`JsonDataConverter`) instead. The functions continue to work for backwards
67+
compatibility.
68+
69+
BREAKING CHANGES (no runtime impact for typical users)
7270

73-
These changes do not alter runtime behavior, but because the package ships
74-
`py.typed`, consumers running strict type checkers (pyright/mypy) — or
75-
subclassing the public abstract types — may need to update their code:
71+
Most of these are type-level only: because the package ships `py.typed`,
72+
consumers running strict type checkers (pyright/mypy) — or subclassing the
73+
public abstract types — may need to update their code. The constructor change
74+
below also affects callers who *directly* construct the named classes, which is
75+
uncommon since they are normally handed to you by the SDK.
7676

7777
- `OrchestrationContext.call_activity`, `call_sub_orchestrator`, `call_entity`,
7878
and `wait_for_external_event` gained new keyword-only parameters
7979
(`return_type` / `data_type`). Subclasses overriding these methods should add
8080
the parameter to match the base signature.
81-
- `client.OrchestrationState` gained a non-public `_data_converter` field
82-
(excluded from equality and `repr`). Code constructing `OrchestrationState`
83-
positionally should pass it via the new field or rely on its default.
81+
- `EntityContext` and `EntityMetadata` (and its `from_entity_metadata` /
82+
`from_entity_response` factories) now require a `data_converter` argument.
83+
These objects are normally constructed by the SDK — you receive an
84+
`EntityContext` in an entity function and an `EntityMetadata` from the client —
85+
so this only affects code that constructs them directly.
8486

8587
## v1.6.0
8688

durabletask-azuremanaged/CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

88
## Unreleased
99

10-
N/A
10+
ADDED
11+
12+
- `DurableTaskSchedulerWorker`, `DurableTaskSchedulerClient`, and the async
13+
client now accept a `data_converter` argument and forward it to the base
14+
worker/client, so a custom `durabletask.serialization.DataConverter` (for
15+
example a pydantic-backed one) can be used with the Durable Task Scheduler.
1116

1217
## v1.6.0
1318

durabletask-azuremanaged/durabletask/azuremanaged/client.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121
import durabletask.internal.shared as shared
2222
from durabletask.payload.store import PayloadStore
23+
from durabletask.serialization import DataConverter
2324

2425

2526
# Client class used for Durable Task Scheduler (DTS)
@@ -35,6 +36,7 @@ def __init__(self, *,
3536
resiliency_options: GrpcClientResiliencyOptions | None = None,
3637
default_version: str | None = None,
3738
payload_store: PayloadStore | None = None,
39+
data_converter: DataConverter | None = None,
3840
log_handler: logging.Handler | None = None,
3941
log_formatter: logging.Formatter | None = None):
4042

@@ -59,7 +61,8 @@ def __init__(self, *,
5961
channel_options=channel_options,
6062
resiliency_options=resiliency_options,
6163
default_version=default_version,
62-
payload_store=payload_store)
64+
payload_store=payload_store,
65+
data_converter=data_converter)
6366

6467

6568
# Async client class used for Durable Task Scheduler (DTS)
@@ -113,6 +116,7 @@ def __init__(self, *,
113116
resiliency_options: GrpcClientResiliencyOptions | None = None,
114117
default_version: str | None = None,
115118
payload_store: PayloadStore | None = None,
119+
data_converter: DataConverter | None = None,
116120
log_handler: logging.Handler | None = None,
117121
log_formatter: logging.Formatter | None = None):
118122

@@ -137,4 +141,5 @@ def __init__(self, *,
137141
channel_options=channel_options,
138142
resiliency_options=resiliency_options,
139143
default_version=default_version,
140-
payload_store=payload_store)
144+
payload_store=payload_store,
145+
data_converter=data_converter)

durabletask-azuremanaged/durabletask/azuremanaged/worker.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
)
1919
import durabletask.internal.shared as shared
2020
from durabletask.payload.store import PayloadStore
21+
from durabletask.serialization import DataConverter
2122
from durabletask.worker import ConcurrencyOptions, TaskHubGrpcWorker
2223

2324

@@ -81,6 +82,7 @@ def __init__(self, *,
8182
resiliency_options: GrpcWorkerResiliencyOptions | None = None,
8283
concurrency_options: ConcurrencyOptions | None = None,
8384
payload_store: PayloadStore | None = None,
85+
data_converter: DataConverter | None = None,
8486
log_handler: logging.Handler | None = None,
8587
log_formatter: logging.Formatter | None = None):
8688

@@ -110,5 +112,6 @@ def __init__(self, *,
110112
concurrency_options=concurrency_options,
111113
# DTS natively supports long timers so chunking is unnecessary
112114
maximum_timer_interval=None,
113-
payload_store=payload_store
115+
payload_store=payload_store,
116+
data_converter=data_converter
114117
)

durabletask/entities/entity_context.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,11 @@
1616

1717
class EntityContext:
1818
def __init__(self, orchestration_id: str, operation: str, state: StateShim,
19-
entity_id: EntityInstanceId, data_converter: "DataConverter | None" = None):
19+
entity_id: EntityInstanceId, data_converter: "DataConverter"):
2020
self._orchestration_id = orchestration_id
2121
self._operation = operation
2222
self._state = state
2323
self._entity_id = entity_id
24-
if data_converter is None:
25-
from durabletask.serialization import JsonDataConverter
26-
data_converter = JsonDataConverter()
2724
self._data_converter = data_converter
2825

2926
@property

durabletask/entities/entity_metadata.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def __init__(self,
3636
locked_by: str,
3737
includes_state: bool,
3838
state: Any | None,
39-
data_converter: "DataConverter | None" = None):
39+
data_converter: "DataConverter"):
4040
"""Initializes a new instance of the EntityMetadata class.
4141
4242
Args:
@@ -48,20 +48,17 @@ def __init__(self,
4848
self._locked_by = locked_by
4949
self.includes_state = includes_state
5050
self._state = state
51-
if data_converter is None:
52-
from durabletask.serialization import JsonDataConverter
53-
data_converter = JsonDataConverter()
5451
self._data_converter = data_converter
5552

5653
@staticmethod
5754
def from_entity_response(entity_response: pb.GetEntityResponse, includes_state: bool,
58-
data_converter: "DataConverter | None" = None):
55+
data_converter: "DataConverter"):
5956
return EntityMetadata.from_entity_metadata(
6057
entity_response.entity, includes_state, data_converter)
6158

6259
@staticmethod
6360
def from_entity_metadata(entity: pb.EntityMetadata, includes_state: bool,
64-
data_converter: "DataConverter | None" = None):
61+
data_converter: "DataConverter"):
6562
try:
6663
entity_id = EntityInstanceId.parse(entity.instanceId)
6764
except ValueError:

durabletask/internal/entity_state_shim.py

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,49 @@
1212

1313

1414
class StateShim:
15-
def __init__(self, start_state: Any, data_converter: "DataConverter | None" = None):
16-
self._current_state: Any = start_state
17-
self._checkpoint_state: Any = start_state
15+
"""In-memory view of an entity's state during a batch.
16+
17+
The state is held internally as its serialized JSON string at all times.
18+
The raw payload off the wire is stored verbatim; a live value supplied via
19+
:meth:`set_state` (or as a non-serialized constructor argument) is
20+
serialized immediately. Keeping a single, always-serialized representation
21+
has two consequences worth noting:
22+
23+
* Deserialization is deferred to :meth:`get_state`, so the caller's
24+
requested type reaches the data converter together with the original
25+
payload (a custom converter can deserialize the string directly into the
26+
target type), and the unmodified wire payload is handed back by
27+
:meth:`encode_state` without being re-encoded.
28+
* Serialization errors surface inside the failing operation (at
29+
:meth:`set_state`) rather than after the batch has run, so a bad write
30+
rolls back just that operation.
31+
32+
Because the held value is always the serialized form, :meth:`get_state`
33+
returns a freshly reconstructed object on every call; it does **not** return
34+
a reference to a stored live object. Mutating a value read from
35+
:meth:`get_state` therefore has no effect on the persisted state unless it
36+
is written back with :meth:`set_state`.
37+
"""
38+
39+
def __init__(self, start_state: Any, data_converter: "DataConverter",
40+
*, is_serialized: bool = False):
41+
self._data_converter = data_converter
42+
# The state is normalized to its serialized string form. ``is_serialized``
43+
# marks ``start_state`` as a raw payload already off the wire (stored
44+
# verbatim); otherwise a live value is serialized now. ``None`` stays
45+
# ``None`` (no persisted state).
46+
serialized_start = self._serialize(start_state, is_serialized)
47+
self._current_state: str | None = serialized_start
48+
self._checkpoint_state: str | None = serialized_start
1849
self._operation_actions: list[pb.OperationAction] = []
1950
self._actions_checkpoint_state: int = 0
20-
if data_converter is None:
21-
from durabletask.serialization import JsonDataConverter
22-
data_converter = JsonDataConverter()
23-
self._data_converter = data_converter
51+
52+
def _serialize(self, state: Any, is_serialized: bool = False) -> str | None:
53+
if state is None:
54+
return None
55+
if is_serialized:
56+
return state
57+
return self._data_converter.serialize(state)
2458

2559
@overload
2660
def get_state(self, intended_type: type[TState], default: TState) -> TState:
@@ -35,31 +69,43 @@ def get_state(self, intended_type: None = None, default: Any = None) -> Any:
3569
...
3670

3771
def get_state(self, intended_type: type[TState] | None = None, default: TState | None = None) -> TState | Any | None:
38-
if self._current_state is None and default is not None:
72+
if self._current_state is None:
3973
return default
4074

75+
# Deferred deserialization: the converter receives the raw payload
76+
# together with the requested type.
4177
if intended_type is None:
42-
return self._current_state
43-
44-
coerced = self._data_converter.coerce(self._current_state, intended_type)
78+
return self._data_converter.deserialize(self._current_state)
79+
result = self._data_converter.deserialize(self._current_state, intended_type)
4580

4681
# An explicit ``intended_type`` is a request to receive that type. The
4782
# default converter is best-effort and would silently return the raw
4883
# value on a failed coercion; restore the stricter contract here by
4984
# raising when a non-None state could not be coerced to a concrete type.
5085
# ``intended_type`` may be a typing generic (e.g. ``list[int]``) at
5186
# runtime, which is not a ``type`` instance, so the guard is required.
52-
if (self._current_state is not None
53-
and isinstance(intended_type, type) # pyright: ignore[reportUnnecessaryIsInstance]
54-
and not isinstance(coerced, intended_type)):
87+
if (isinstance(intended_type, type) # pyright: ignore[reportUnnecessaryIsInstance]
88+
and not isinstance(result, intended_type)):
5589
raise TypeError(
56-
f"Could not convert state of type '{type(self._current_state).__name__}' to '{intended_type.__name__}'"
90+
f"Could not convert state of type '{type(result).__name__}' to '{intended_type.__name__}'"
5791
)
5892

59-
return coerced
93+
return result
6094

6195
def set_state(self, state: Any) -> None:
62-
self._current_state = state
96+
# Serialize eagerly so the held value is always the wire form and any
97+
# serialization error surfaces here, inside the failing operation.
98+
self._current_state = self._serialize(state)
99+
100+
def encode_state(self) -> str | None:
101+
"""Return the serialized current state for persistence back to the wire.
102+
103+
The state is already held in serialized form, so this is the stored
104+
value verbatim: ``None`` when there is no state (which clears the
105+
persisted entity state), otherwise the JSON string. No re-encoding
106+
occurs, so a payload that was never modified round-trips unchanged.
107+
"""
108+
return self._current_state
63109

64110
def add_operation_action(self, action: pb.OperationAction) -> None:
65111
self._operation_actions.append(action)

0 commit comments

Comments
 (0)