-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathjson_codec.py
More file actions
183 lines (151 loc) · 7.65 KB
/
Copy pathjson_codec.py
File metadata and controls
183 lines (151 loc) · 7.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Internal JSON codec for Durable Task payloads.
This module holds the low-level serialization *mechanism* -- the JSON string
encode/decode primitives and the value-level type coercion used to reconstruct
custom objects. Serialization *policy* (the public, pluggable strategy) lives in
:mod:`durabletask.serialization`; the default ``JsonDataConverter`` is the only
production consumer of ``to_json`` / ``from_json``, while ``coerce_to_type`` is
also used directly by entity state accessors that already hold a parsed value.
"""
from __future__ import annotations
import dataclasses
import json
import types
import typing
from collections.abc import Sequence
from types import SimpleNamespace
from typing import Any, cast
# Marker formerly added to JSON payloads to flag objects for automatic
# deserialization into a SimpleNamespace. New code no longer emits this marker
# (objects are serialized as plain JSON), but the decoder still recognizes it so
# that orchestration histories produced by older SDK versions continue to replay.
AUTO_SERIALIZED = "__durabletask_autoobject__"
def to_json(obj: Any) -> str:
"""Serialize a value to a JSON string.
Builtins serialize to plain JSON. Dataclasses, ``SimpleNamespace``
instances, and objects exposing a ``to_json()`` method are serialized to
plain JSON as well (without any type marker); custom objects can be
reconstructed on the receiving side by passing ``expected_type`` to
:func:`from_json`.
"""
try:
return json.dumps(obj, default=_encode_custom_object)
except TypeError as e:
# Preserve the original error as the cause so serialization failures are
# easier to diagnose, while naming the offending top-level type.
raise TypeError(
f"Failed to serialize object of type '{type(obj).__name__}' to JSON: {e}"
) from e
def from_json(json_str: str | bytes | bytearray, expected_type: type | None = None) -> Any:
"""Deserialize a JSON string, optionally coercing the result to a type.
When ``expected_type`` is ``None`` (the default) the raw parsed JSON is
returned. For backwards compatibility, payloads carrying the legacy
:data:`AUTO_SERIALIZED` marker are reconstructed as ``SimpleNamespace``
instances so that in-flight orchestrations produced by older SDK versions
continue to replay.
When ``expected_type`` is provided, the legacy marker (if present) is
stripped and the parsed value is coerced to ``expected_type`` -- dataclasses
are constructed from their dict payloads, types exposing a ``from_json()``
classmethod are reconstructed via that hook, and ``Optional``/``Union`` and
``list`` type hints are honored recursively. The destination type is always
supplied by the caller; it is never read from the payload.
"""
if expected_type is None:
return json.loads(json_str, object_hook=_legacy_object_hook)
raw = json.loads(json_str, object_hook=_strip_legacy_marker)
return coerce_to_type(raw, expected_type)
def _encode_custom_object(o: Any) -> Any:
"""``default`` hook for :func:`json.dumps` that emits plain JSON.
Called only for values the JSON encoder cannot natively serialize. Note that
namedtuples are handled natively by the encoder (serialized as JSON arrays)
and never reach this hook.
"""
if dataclasses.is_dataclass(o) and not isinstance(o, type):
return dataclasses.asdict(o)
if isinstance(o, SimpleNamespace):
return vars(o)
# Custom objects may opt in via a ``to_json`` hook. It is resolved off the
# type and called with the instance (``type(o).to_json(o)``) so that both
# instance methods and ``@staticmethod`` hooks work -- matching the calling
# convention used by ``azure-functions-durable``. The hook returns a
# JSON-serializable value (a structure or a string), not a JSON document.
to_json_hook = getattr(cast(Any, type(o)), "to_json", None)
if callable(to_json_hook):
return to_json_hook(o)
# This will raise a TypeError describing the unsupported type.
raise TypeError(f"Object of type '{type(o).__name__}' is not JSON serializable")
def _legacy_object_hook(d: dict[str, Any]) -> Any:
# If the object carries the legacy marker, deserialize it as a SimpleNamespace.
if d.pop(AUTO_SERIALIZED, False):
return SimpleNamespace(**d)
return d
def _strip_legacy_marker(d: dict[str, Any]) -> dict[str, Any]:
# Discard the legacy marker so typed coercion sees a plain dict.
d.pop(AUTO_SERIALIZED, None)
return d
def coerce_to_type(value: Any, expected_type: Any) -> Any:
"""Coerce an already-parsed JSON value to ``expected_type``.
Handles ``None``/``Optional``/``Union`` and ``list`` type hints recursively,
types exposing a ``from_json()`` classmethod, and dataclasses (including
nested dataclass fields). The destination type is always caller-supplied and
never derived from the payload, keeping deserialization secure.
"""
if expected_type is None or value is None:
return value
origin = typing.get_origin(expected_type)
if origin is not None:
return _coerce_generic(value, expected_type, origin)
if not isinstance(expected_type, type):
# Not a concrete, instantiable type (e.g. a typing special form we don't
# special-case) -- return the value unchanged.
return value
if isinstance(value, expected_type):
return value
from_json_hook = getattr(expected_type, "from_json", None)
if callable(from_json_hook):
return from_json_hook(value)
if dataclasses.is_dataclass(expected_type) and isinstance(value, dict):
return _build_dataclass(expected_type, cast(dict[str, Any], value))
type_ctor = cast(Any, expected_type)
try:
return type_ctor(value)
except Exception as e:
type_name = getattr(type_ctor, "__name__", None) or str(type_ctor)
raise TypeError(
f"Could not coerce value of type '{type(value).__name__}' to "
f"'{type_name}'"
) from e
def _coerce_generic(value: Any, expected_type: Any, origin: Any) -> Any:
args = typing.get_args(expected_type)
if origin is typing.Union or origin is types.UnionType:
# If the value already matches a member type, keep it as-is.
non_none = [a for a in args if a is not type(None)]
for arg in non_none:
if isinstance(arg, type) and isinstance(value, arg):
return value
# ``Optional[T]`` (exactly one non-None member): coerce to that member.
# For a genuine multi-member ``Union`` where the value matched none of
# the members, leave it untouched rather than guessing the first arg --
# forcing a coercion there can silently mis-construct the wrong type.
if len(non_none) == 1:
return coerce_to_type(value, non_none[0])
return value
if origin in (list, Sequence) and isinstance(value, list):
elem_type = args[0] if args else None
return [coerce_to_type(item, elem_type) for item in cast(list[Any], value)]
# Other generics (dict, tuple, ...) are returned as parsed JSON.
return value
def _build_dataclass(cls: Any, data: dict[str, Any]) -> Any:
"""Construct a dataclass from its dict payload, recursing into typed fields."""
try:
hints = typing.get_type_hints(cls)
except Exception:
hints = {}
kwargs: dict[str, Any] = {}
for field in dataclasses.fields(cls):
if field.name not in data:
continue
field_type = hints.get(field.name)
kwargs[field.name] = coerce_to_type(data[field.name], field_type)
return cls(**kwargs)