forked from Azure/azure-functions-python-library
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_durable_functions.py
More file actions
389 lines (314 loc) · 13.2 KB
/
Copy path_durable_functions.py
File metadata and controls
389 lines (314 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
import logging
import os
import sys
import warnings
from typing import Any, Callable, Optional, Union
from . import _abc
logger = logging.getLogger("azure.functions.DurableFunctions")
_STRICT_ENV_VAR = "AZURE_FUNCTIONS_DURABLE_STRICT_TYPING"
_TRUTHY = frozenset({"1", "true", "yes"})
_LEGACY_KEYS = frozenset({"__class__", "__module__", "__data__"})
# One-shot notice flags. Each becomes True after the corresponding
# advisory has been emitted in this process; tests may reset them.
_loose_codec_notice_emitted = False
_no_expected_type_notice_emitted = False
def _is_strict_mode() -> bool:
return os.environ.get(_STRICT_ENV_VAR, "").strip().lower() in _TRUTHY
def _notify_loose_codec_used() -> None:
"""Emit a one-time advisory the first time the loose-mode object_hook
path actually reconstructs a custom object in this process."""
global _loose_codec_notice_emitted
if _loose_codec_notice_emitted or _is_strict_mode():
return
_loose_codec_notice_emitted = True
msg = (
"azure.functions Durable JSON codec reconstructed a custom "
"object via the loose-mode object_hook path. Set "
"AZURE_FUNCTIONS_DURABLE_STRICT_TYPING=1 and supply "
"expected_type at decode call sites to enable type-validated "
"deserialization. This message will not be repeated."
)
logger.info(msg)
warnings.warn(msg, DeprecationWarning, stacklevel=2)
def _notify_no_expected_type() -> None:
"""Emit a one-time advisory the first time df_loads is called in
loose mode without an expected_type in this process."""
global _no_expected_type_notice_emitted
if _no_expected_type_notice_emitted or _is_strict_mode():
return
_no_expected_type_notice_emitted = True
msg = (
"azure.functions df_loads was called without expected_type. "
"Pass the destination type to enable validation and prepare "
"for strict typing (AZURE_FUNCTIONS_DURABLE_STRICT_TYPING=1). "
"This message will not be repeated."
)
logger.info(msg)
warnings.warn(msg, DeprecationWarning, stacklevel=2)
# Utilities
def _serialize_custom_object(obj):
"""Serialize a user-defined object to JSON.
This function gets called when `json.dumps` cannot serialize
an object and returns a serializable dictionary containing enough
metadata to recontrust the original object.
Parameters
----------
obj: Object
The object to serialize
Returns
-------
dict_obj: A serializable dictionary with enough metadata to reconstruct
`obj`
Exceptions
----------
TypeError:
Raise if `obj` does not contain a `to_json` attribute
"""
# 'safety' guard: raise error if object does not
# support serialization
if not hasattr(obj, "to_json"):
raise TypeError(f"class {type(obj)} does not expose a `to_json` "
"function")
# Encode to json using the object's `to_json`
obj_type = type(obj)
return {
"__class__": obj.__class__.__name__,
"__module__": obj.__module__,
"__data__": obj_type.to_json(obj)
}
def _deserialize_custom_object(obj: dict) -> object:
"""Deserialize a user-defined object from JSON.
Reconstructs a custom object from a dictionary that carries the
``{"__class__", "__module__", "__data__"}`` envelope produced by
:func:`_serialize_custom_object`. The class is resolved by looking
up ``__module__`` in :data:`sys.modules`; modules are never imported
on demand.
Parameters
----------
obj: dict
Dictionary that potentially encodes a custom class.
Returns
-------
object
Either the original ``obj`` dictionary (if it is not an
envelope) or the reconstructed custom object.
Raises
------
ValueError
If the declared module is not present in ``sys.modules``.
AttributeError
If the declared module is loaded but does not define the
declared class.
TypeError
If the resolved class does not expose a ``from_json`` function.
"""
if ("__class__" in obj) and ("__module__" in obj) and ("__data__" in obj):
class_name = obj.pop("__class__")
module_name = obj.pop("__module__")
obj_data = obj.pop("__data__")
# Resolve the class from already-loaded modules; this function
# does not import modules on demand.
module = sys.modules.get(module_name)
if module is None:
raise ValueError(
f"cannot deserialize custom object: module "
f"{module_name!r} is not loaded in sys.modules"
)
class_ = getattr(module, class_name, None)
if class_ is None:
raise AttributeError(
f"cannot deserialize custom object: class {class_name!r} "
f"not found in module {module_name!r}"
)
if not hasattr(class_, "from_json"):
raise TypeError(f"class {type(obj)} does not expose a `from_json` "
"function")
# Initialize the object using its `from_json` deserializer
obj = class_.from_json(obj_data)
_notify_loose_codec_used()
return obj
# ---------------------------------------------------------------------------
# Public Durable Functions JSON codec
# ---------------------------------------------------------------------------
def df_dumps(value: Any) -> str:
"""Serialize *value* to a JSON string.
In **loose mode** (default) this is equivalent to
``json.dumps(value, default=_serialize_custom_object)``: nested
custom objects are wrapped recursively in the
``{"__class__", "__module__", "__data__"}`` envelope.
In **strict mode** (``AZURE_FUNCTIONS_DURABLE_STRICT_TYPING`` set
to ``1``, ``true`` or ``yes``) only the top-level custom object is
wrapped; its ``__data__`` payload is serialized as plain JSON
without a ``default=`` hook. ``to_json()`` must therefore return
a value that is natively JSON-serializable, and ``TypeError`` is
raised if any nested value is not.
"""
if _is_strict_mode():
if hasattr(value, "to_json"):
envelope = _serialize_custom_object(value)
return json.dumps(envelope)
# Primitive / plain-JSON value -- serialize without default=.
return json.dumps(value)
return json.dumps(value, default=_serialize_custom_object)
def df_loads(s: str, expected_type: Optional[type] = None) -> Any:
"""Deserialize a JSON string, optionally validating against *expected_type*.
When *expected_type* is ``None``:
* **Loose mode** (default) runs
``json.loads(s, object_hook=_deserialize_custom_object)``. Custom
objects whose declaring module is already present in
``sys.modules`` are reconstructed; otherwise ``ValueError`` is
raised.
* **Strict mode** parses without an ``object_hook``. A legacy
custom-object envelope at the top level raises ``TypeError`` --
the caller must supply ``expected_type`` to deserialize custom
objects in strict mode.
When *expected_type* is provided the raw JSON is parsed first
(without an ``object_hook``) so the payload can be inspected before
any class lookup. On a class/module mismatch loose mode logs a
warning and strict mode raises ``TypeError``. In loose mode the
legacy ``object_hook`` path then runs (so nested custom objects are
also reconstructed); in strict mode the matching custom-object
payload is reconstructed by calling
``expected_type.from_json(raw["__data__"])`` directly.
"""
if expected_type is not None:
return _loads_with_expected_type(s, expected_type)
_notify_no_expected_type()
if _is_strict_mode():
return _loads_strict_no_type(s)
return json.loads(s, object_hook=_deserialize_custom_object)
def _get_serialize_default() -> Optional[Callable]:
"""Return the ``default`` callback for ``json.dumps``.
Intended for call sites that build their own ``json.dumps``
invocation (e.g. ``OrchestratorState.to_json_string``) and want to
honour the active typing mode. Returns ``_serialize_custom_object``
in loose mode and ``None`` in strict mode.
"""
if _is_strict_mode():
return None
return _serialize_custom_object
def _loads_strict_no_type(s: str) -> Any:
"""Strict-mode deserialization when no *expected_type* is supplied.
Parses *s* without an ``object_hook``. Returns the parsed value
unchanged for primitive / plain-JSON payloads; raises ``TypeError``
if the top-level value is a legacy custom-object envelope.
"""
raw = json.loads(s)
if _is_legacy_custom_dict(raw):
raise TypeError(
"df_loads: strict mode requires expected_type to "
"deserialize custom-object payloads, but none was provided. "
f"Payload declares {raw['__module__']}.{raw['__class__']}."
)
return raw
def _is_legacy_custom_dict(d: Any) -> bool:
"""Return True if *d* is a dict with legacy custom-object markers."""
return isinstance(d, dict) and _LEGACY_KEYS.issubset(d)
def _has_json_protocol(cls: type) -> bool:
"""Return True iff *cls* exposes callable ``to_json`` and ``from_json``."""
return callable(getattr(cls, "to_json", None)) and callable(
getattr(cls, "from_json", None)
)
def _is_compatible(value: Any, expected_type: type) -> bool:
"""Best-effort ``isinstance`` check that tolerates generic type hints."""
try:
return isinstance(value, expected_type)
except TypeError:
# typing constructs like List[int] aren't valid for isinstance.
return True
def _loads_with_expected_type(s: str, expected_type: type) -> Any:
"""Parse *s* and validate the result against *expected_type*.
The raw JSON is parsed without an ``object_hook`` so the payload
shape can be inspected before any class lookup. In strict mode a
matching custom-object payload is reconstructed via
``expected_type.from_json``; in loose mode the legacy
``object_hook`` path runs so nested custom objects inside
``__data__`` are also reconstructed.
"""
raw = json.loads(s)
strict = _is_strict_mode()
if _is_legacy_custom_dict(raw):
class_name = raw["__class__"]
module_name = raw["__module__"]
type_matches = (class_name == expected_type.__name__
and module_name == expected_type.__module__)
if not type_matches:
msg = (
f"df_loads: payload declares class "
f"{module_name}.{class_name} but expected "
f"{expected_type.__module__}.{expected_type.__name__}"
)
if strict:
raise TypeError(msg)
logger.warning(msg)
# Fall through to the object_hook path below.
if strict:
if not _has_json_protocol(expected_type):
raise TypeError(
f"df_loads: expected_type "
f"{expected_type.__module__}.{expected_type.__name__} "
f"does not expose from_json"
)
return expected_type.from_json(raw["__data__"])
# Loose mode -- use the object_hook path so nested custom
# objects inside __data__ are also reconstructed.
return json.loads(s, object_hook=_deserialize_custom_object)
# Primitive / plain-JSON payload -- validate the Python type.
if not _is_compatible(raw, expected_type):
msg = (
f"df_loads: deserialized value ({type(raw).__name__}) is not "
f"compatible with expected type {expected_type}"
)
if strict:
raise TypeError(msg)
logger.warning(msg)
if strict:
return raw
# Loose mode -- use the object_hook path so nested custom objects
# inside dicts/lists are reconstructed.
return json.loads(s, object_hook=_deserialize_custom_object)
class OrchestrationContext(_abc.OrchestrationContext):
"""A durable function orchestration context.
:param str body:
The body of orchestration context json.
"""
def __init__(self,
body: Union[str, bytes]) -> None:
if isinstance(body, str):
self.__body = body
if isinstance(body, bytes):
self.__body = body.decode('utf-8')
@property
def body(self) -> str:
return self.__body
def __repr__(self):
return (
f'<azure.OrchestrationContext '
f'body={self.body}>'
)
def __str__(self):
return self.__body
class EntityContext(_abc.OrchestrationContext):
"""A durable function entity context.
:param str body:
The body of orchestration context json.
"""
def __init__(self,
body: Union[str, bytes]) -> None:
if isinstance(body, str):
self.__body = body
if isinstance(body, bytes):
self.__body = body.decode('utf-8')
@property
def body(self) -> str:
return self.__body
def __repr__(self):
return (
f'<azure.EntityContext '
f'body={self.body}>'
)
def __str__(self):
return self.__body