Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 48 additions & 46 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,54 +12,44 @@ ADDED
- Added a pluggable `DataConverter` (`durabletask.serialization`) accepted by
`TaskHubGrpcWorker`, `TaskHubGrpcClient`, and `AsyncTaskHubGrpcClient` via a
`data_converter` argument. Every payload boundary (inputs, outputs, events,
custom status, entity state) routes through it. The default
custom status, entity state) routes through it, so one converter controls how
Python values become JSON and how they are reconstructed. The default
`JsonDataConverter` preserves existing behavior, so a custom converter (for
example one backed by pydantic) is opt-in. Custom objects can opt in via a
`to_json()` hook and a `from_json(value)` classmethod.
- `OrchestrationContext.call_activity`, `call_sub_orchestrator`, and
`call_entity` accept an optional `return_type`, and `wait_for_external_event`
accepts an optional `data_type`. When provided, the result/event payload is
reconstructed as that type (dataclasses — including nested dataclass,
`Optional`, and `list` fields — and `from_json()`-capable types) and the
returned task is typed accordingly (e.g. `call_activity(..., return_type=Foo)`
yields `CompletableTask[Foo]`). When omitted, the raw deserialized JSON is
returned as before.
- Inbound payloads are reconstructed from function type annotations. When an
orchestrator, activity, or entity operation annotates its input parameter (or
an activity its return value) with a dataclass or `from_json()`-capable type,
the payload is reconstructed as that type. Builtins and unannotated/unknown
types are passed through unchanged. An explicit `return_type` takes precedence
over a discovered annotation.
- Added typed accessors to `client.OrchestrationState`: `get_input()`,
`get_output()`, and `get_custom_status()` each accept an optional
`expected_type` and deserialize the corresponding payload, reconstructing
dataclasses and `from_json()`-capable types. The raw `serialized_*` fields are
retained.
- Objects exposing a `to_json()` method are now JSON-serializable when passed as
activity/orchestrator inputs or outputs.
- Added `EntityMetadata.get_typed_state(intended_type=...)`, which deserializes
the entity's persisted state and reconstructs dataclasses and
`from_json()`-capable types. The existing `get_state()` is unchanged: with no
argument it returns the raw serialized JSON payload, and `get_state(some_type)`
applies constructor-based coercion (`some_type(raw)`).
- Entity runtime state retrieval (`EntityContext.get_state(intended_type=...)` /
`DurableEntity.get_state(...)`) now also reconstructs dataclasses and
`from_json()`-capable types, in addition to the existing constructor-based
coercion.
example one backed by pydantic) is fully opt-in.
- Custom objects can participate in serialization by exposing a `to_json()`
method and a `from_json(value)` classmethod. Both are honored recursively, so
nested custom objects round-trip through their own hooks.
- Payloads are reconstructed into a caller-supplied type — dataclasses
(including nested fields), `from_json()`-capable types, and `enum.Enum`
members, recursing through `list`, `dict`, `tuple`, and `Optional`/`Union`
hints. The type comes from a function's annotations, from an explicit
`return_type` on `call_activity` / `call_sub_orchestrator` / `call_entity`
(or `data_type` on `wait_for_external_event`), or from the typed accessors
`get_input()` / `get_output()` / `get_custom_status()` on
`client.OrchestrationState` and `EntityMetadata.get_typed_state(...)`. It is
never inferred from the payload. Which annotated types are eligible is decided
by the converter via the overridable `DataConverter.can_reconstruct(...)`; a
custom converter can override it to recognize its own types (for example
`pydantic.BaseModel` subclasses).

CHANGED

- Custom objects (dataclasses, `SimpleNamespace`, namedtuples) are now
serialized as plain JSON. Decoding such a payload *without* a type hint now
yields a plain `dict` (previously a `SimpleNamespace`; a namedtuple now
round-trips as a JSON array). To get the original type back, pass the new
`return_type` / `data_type` arguments, annotate the consuming function's
parameter or return type, or use the typed client accessors. Payloads produced
by older SDK versions still deserialize — including into a `SimpleNamespace`
when no type is supplied — so in-flight orchestrations continue to replay
across an upgrade.
round-trips as a JSON array). To get the original type back, supply a type via
one of the mechanisms above. Payloads produced by older SDK versions still
deserialize — including into a `SimpleNamespace` when no type is supplied — so
in-flight orchestrations continue to replay across an upgrade.
- JSON serialization failures now raise a `TypeError` that chains the original
error (`__cause__`) and names the offending type.
- `EntityContext.get_state()` / `DurableEntity.get_state()` now return a freshly
reconstructed value on every call rather than a reference to a single cached
object. This changes v1.6.0 behavior: mutating the returned value in place no
longer affects persisted state — write it back with `set_state()`. State is
also serialized eagerly at `set_state()` time, so a non-serializable value
fails inside the operation (which rolls back) instead of after the batch has
run.

FIXED

Expand All @@ -68,19 +58,31 @@ FIXED
"no state" and written as `None`, effectively deleting it; only an actual
`None` state now clears the persisted entity state.

BREAKING CHANGES (type-level only — no runtime impact for typical users)
DEPRECATED

- `durabletask.internal.shared.to_json` and `durabletask.internal.shared.from_json`
are deprecated and now emit a `DeprecationWarning`. Use a
`durabletask.serialization.DataConverter` (for example the default
`JsonDataConverter`) instead. The functions continue to work for backwards
compatibility.

BREAKING CHANGES (no runtime impact for typical users)

These changes do not alter runtime behavior, but because the package ships
`py.typed`, consumers running strict type checkers (pyright/mypy) — or
subclassing the public abstract types — may need to update their code:
Most of these are type-level only: because the package ships `py.typed`,
consumers running strict type checkers (pyright/mypy) — or subclassing the
public abstract types — may need to update their code. The constructor change
below also affects callers who *directly* construct the named classes, which is
uncommon since they are normally handed to you by the SDK.

- `OrchestrationContext.call_activity`, `call_sub_orchestrator`, `call_entity`,
and `wait_for_external_event` gained new keyword-only parameters
(`return_type` / `data_type`). Subclasses overriding these methods should add
the parameter to match the base signature.
- `client.OrchestrationState` gained a non-public `_data_converter` field
(excluded from equality and `repr`). Code constructing `OrchestrationState`
positionally should pass it via the new field or rely on its default.
- `EntityContext` and `EntityMetadata` (and its `from_entity_metadata` /
`from_entity_response` factories) now require a `data_converter` argument.
These objects are normally constructed by the SDK — you receive an
`EntityContext` in an entity function and an `EntityMetadata` from the client —
so this only affects code that constructs them directly.

## v1.6.0

Expand Down
7 changes: 6 additions & 1 deletion durabletask-azuremanaged/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

N/A
ADDED

- `DurableTaskSchedulerWorker`, `DurableTaskSchedulerClient`, and the async
client now accept a `data_converter` argument and forward it to the base
worker/client, so a custom `durabletask.serialization.DataConverter` (for
example a pydantic-backed one) can be used with the Durable Task Scheduler.

## v1.6.0

Expand Down
9 changes: 7 additions & 2 deletions durabletask-azuremanaged/durabletask/azuremanaged/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
import durabletask.internal.shared as shared
from durabletask.payload.store import PayloadStore
from durabletask.serialization import DataConverter


# Client class used for Durable Task Scheduler (DTS)
Expand All @@ -35,6 +36,7 @@ def __init__(self, *,
resiliency_options: GrpcClientResiliencyOptions | None = None,
default_version: str | None = None,
payload_store: PayloadStore | None = None,
data_converter: DataConverter | None = None,
log_handler: logging.Handler | None = None,
log_formatter: logging.Formatter | None = None):

Expand All @@ -59,7 +61,8 @@ def __init__(self, *,
channel_options=channel_options,
resiliency_options=resiliency_options,
default_version=default_version,
payload_store=payload_store)
payload_store=payload_store,
data_converter=data_converter)


# Async client class used for Durable Task Scheduler (DTS)
Expand Down Expand Up @@ -113,6 +116,7 @@ def __init__(self, *,
resiliency_options: GrpcClientResiliencyOptions | None = None,
default_version: str | None = None,
payload_store: PayloadStore | None = None,
data_converter: DataConverter | None = None,
log_handler: logging.Handler | None = None,
log_formatter: logging.Formatter | None = None):

Expand All @@ -137,4 +141,5 @@ def __init__(self, *,
channel_options=channel_options,
resiliency_options=resiliency_options,
default_version=default_version,
payload_store=payload_store)
payload_store=payload_store,
data_converter=data_converter)
5 changes: 4 additions & 1 deletion durabletask-azuremanaged/durabletask/azuremanaged/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
import durabletask.internal.shared as shared
from durabletask.payload.store import PayloadStore
from durabletask.serialization import DataConverter
from durabletask.worker import ConcurrencyOptions, TaskHubGrpcWorker


Expand Down Expand Up @@ -81,6 +82,7 @@ def __init__(self, *,
resiliency_options: GrpcWorkerResiliencyOptions | None = None,
concurrency_options: ConcurrencyOptions | None = None,
payload_store: PayloadStore | None = None,
data_converter: DataConverter | None = None,
log_handler: logging.Handler | None = None,
log_formatter: logging.Formatter | None = None):

Expand Down Expand Up @@ -110,5 +112,6 @@ def __init__(self, *,
concurrency_options=concurrency_options,
# DTS natively supports long timers so chunking is unnecessary
maximum_timer_interval=None,
payload_store=payload_store
payload_store=payload_store,
data_converter=data_converter
)
5 changes: 1 addition & 4 deletions durabletask/entities/entity_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@

class EntityContext:
def __init__(self, orchestration_id: str, operation: str, state: StateShim,
entity_id: EntityInstanceId, data_converter: "DataConverter | None" = None):
entity_id: EntityInstanceId, data_converter: "DataConverter"):
self._orchestration_id = orchestration_id
self._operation = operation
self._state = state
self._entity_id = entity_id
if data_converter is None:
from durabletask.serialization import JsonDataConverter
data_converter = JsonDataConverter()
self._data_converter = data_converter

@property
Expand Down
9 changes: 3 additions & 6 deletions durabletask/entities/entity_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self,
locked_by: str,
includes_state: bool,
state: Any | None,
data_converter: "DataConverter | None" = None):
data_converter: "DataConverter"):
"""Initializes a new instance of the EntityMetadata class.

Args:
Expand All @@ -48,20 +48,17 @@ def __init__(self,
self._locked_by = locked_by
self.includes_state = includes_state
self._state = state
if data_converter is None:
from durabletask.serialization import JsonDataConverter
data_converter = JsonDataConverter()
self._data_converter = data_converter

@staticmethod
def from_entity_response(entity_response: pb.GetEntityResponse, includes_state: bool,
data_converter: "DataConverter | None" = None):
data_converter: "DataConverter"):
return EntityMetadata.from_entity_metadata(
entity_response.entity, includes_state, data_converter)

@staticmethod
def from_entity_metadata(entity: pb.EntityMetadata, includes_state: bool,
data_converter: "DataConverter | None" = None):
data_converter: "DataConverter"):
try:
entity_id = EntityInstanceId.parse(entity.instanceId)
except ValueError:
Expand Down
80 changes: 63 additions & 17 deletions durabletask/internal/entity_state_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,49 @@


class StateShim:
def __init__(self, start_state: Any, data_converter: "DataConverter | None" = None):
self._current_state: Any = start_state
self._checkpoint_state: Any = start_state
"""In-memory view of an entity's state during a batch.

The state is held internally as its serialized JSON string at all times.
The raw payload off the wire is stored verbatim; a live value supplied via
:meth:`set_state` (or as a non-serialized constructor argument) is
serialized immediately. Keeping a single, always-serialized representation
has two consequences worth noting:

* Deserialization is deferred to :meth:`get_state`, so the caller's
requested type reaches the data converter together with the original
payload (a custom converter can deserialize the string directly into the
target type), and the unmodified wire payload is handed back by
:meth:`encode_state` without being re-encoded.
* Serialization errors surface inside the failing operation (at
:meth:`set_state`) rather than after the batch has run, so a bad write
rolls back just that operation.

Because the held value is always the serialized form, :meth:`get_state`
returns a freshly reconstructed object on every call; it does **not** return
a reference to a stored live object. Mutating a value read from
:meth:`get_state` therefore has no effect on the persisted state unless it
is written back with :meth:`set_state`.
"""

def __init__(self, start_state: Any, data_converter: "DataConverter",
*, is_serialized: bool = False):
self._data_converter = data_converter
# The state is normalized to its serialized string form. ``is_serialized``
# marks ``start_state`` as a raw payload already off the wire (stored
# verbatim); otherwise a live value is serialized now. ``None`` stays
# ``None`` (no persisted state).
serialized_start = self._serialize(start_state, is_serialized)
self._current_state: str | None = serialized_start
self._checkpoint_state: str | None = serialized_start
self._operation_actions: list[pb.OperationAction] = []
self._actions_checkpoint_state: int = 0
if data_converter is None:
from durabletask.serialization import JsonDataConverter
data_converter = JsonDataConverter()
self._data_converter = data_converter

def _serialize(self, state: Any, is_serialized: bool = False) -> str | None:
if state is None:
return None
if is_serialized:
return state
return self._data_converter.serialize(state)

@overload
def get_state(self, intended_type: type[TState], default: TState) -> TState:
Expand All @@ -35,31 +69,43 @@ def get_state(self, intended_type: None = None, default: Any = None) -> Any:
...

def get_state(self, intended_type: type[TState] | None = None, default: TState | None = None) -> TState | Any | None:
if self._current_state is None and default is not None:
if self._current_state is None:
return default

# Deferred deserialization: the converter receives the raw payload
# together with the requested type.
if intended_type is None:
return self._current_state

coerced = self._data_converter.coerce(self._current_state, intended_type)
return self._data_converter.deserialize(self._current_state)
result = self._data_converter.deserialize(self._current_state, intended_type)

# An explicit ``intended_type`` is a request to receive that type. The
# default converter is best-effort and would silently return the raw
# value on a failed coercion; restore the stricter contract here by
# raising when a non-None state could not be coerced to a concrete type.
# ``intended_type`` may be a typing generic (e.g. ``list[int]``) at
# runtime, which is not a ``type`` instance, so the guard is required.
if (self._current_state is not None
and isinstance(intended_type, type) # pyright: ignore[reportUnnecessaryIsInstance]
and not isinstance(coerced, intended_type)):
if (isinstance(intended_type, type) # pyright: ignore[reportUnnecessaryIsInstance]
and not isinstance(result, intended_type)):
raise TypeError(
f"Could not convert state of type '{type(self._current_state).__name__}' to '{intended_type.__name__}'"
f"Could not convert state of type '{type(result).__name__}' to '{intended_type.__name__}'"
)

return coerced
return result

def set_state(self, state: Any) -> None:
self._current_state = state
# Serialize eagerly so the held value is always the wire form and any
# serialization error surfaces here, inside the failing operation.
self._current_state = self._serialize(state)

def encode_state(self) -> str | None:
"""Return the serialized current state for persistence back to the wire.

The state is already held in serialized form, so this is the stored
value verbatim: ``None`` when there is no state (which clears the
persisted entity state), otherwise the JSON string. No re-encoding
occurs, so a payload that was never modified round-trips unchanged.
"""
return self._current_state

def add_operation_action(self, action: pb.OperationAction) -> None:
self._operation_actions.append(action)
Expand Down
Loading
Loading