-
-
Notifications
You must be signed in to change notification settings - Fork 199
Expand file tree
/
Copy pathcoder.py
More file actions
148 lines (119 loc) · 4.42 KB
/
coder.py
File metadata and controls
148 lines (119 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import datetime
import json
import pickle # nosec:B403
from decimal import Decimal
from typing import (
Any,
Callable,
ClassVar,
Dict,
Optional,
TypeVar,
Union,
overload,
)
import pendulum
from fastapi.encoders import jsonable_encoder
from pydantic import BaseConfig, ValidationError
from starlette.responses import JSONResponse
from starlette.templating import (
_TemplateResponse as TemplateResponse, # pyright: ignore[reportPrivateUsage]
)
from pydantic.version import VERSION as PYDANTIC_VERSION
PYDANTIC_V2 = PYDANTIC_VERSION.startswith("2.")
if PYDANTIC_V2:
from fastapi._compat import ModelField
else:
from pydantic.fields import ModelField
_T = TypeVar("_T", bound=type)
CONVERTERS: Dict[str, Callable[[str], Any]] = {
# Pendulum 3.0.0 adds parse to __all__, at which point these ignores can be removed
"date": lambda x: pendulum.parse(x, exact=True), # type: ignore[attr-defined]
"datetime": lambda x: pendulum.parse(x, exact=True), # type: ignore[attr-defined]
"decimal": Decimal,
}
class JsonEncoder(json.JSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, datetime.datetime):
return {"val": str(o), "_spec_type": "datetime"}
elif isinstance(o, datetime.date):
return {"val": str(o), "_spec_type": "date"}
elif isinstance(o, Decimal):
return {"val": str(o), "_spec_type": "decimal"}
else:
return jsonable_encoder(o)
def object_hook(obj: Any) -> Any:
_spec_type = obj.get("_spec_type")
if not _spec_type:
return obj
if _spec_type in CONVERTERS:
return CONVERTERS[_spec_type](obj["val"])
else:
raise TypeError(f"Unknown {_spec_type}")
class Coder:
@classmethod
def encode(cls, value: Any) -> bytes:
raise NotImplementedError
@classmethod
def decode(cls, value: bytes) -> Any:
raise NotImplementedError
# (Shared) cache for endpoint return types to Pydantic model fields.
# Note that subclasses share this cache! If a subclass overrides the
# decode_as_type method and then stores a different kind of field for a
# given type, do make sure that the subclass provides its own class
# attribute for this cache.
_type_field_cache: ClassVar[Dict[Any, ModelField]] = {}
@overload
@classmethod
def decode_as_type(cls, value: bytes, *, type_: _T) -> _T:
...
@overload
@classmethod
def decode_as_type(cls, value: bytes, *, type_: None) -> Any:
...
@classmethod
def decode_as_type(cls, value: bytes, *, type_: Optional[_T]) -> Union[_T, Any]:
"""Decode value to the specific given type
The default implementation uses the Pydantic model system to convert the value.
"""
result = cls.decode(value)
if type_ is not None:
try:
field = cls._type_field_cache[type_]
except KeyError:
field = cls._type_field_cache[type_] = ModelField(
name="body", type_=type_, class_validators=None, model_config=BaseConfig
)
result, errors = field.validate(result, {}, loc=())
if errors is not None:
if not isinstance(errors, list):
errors = [errors]
raise ValidationError(errors, type_)
return result
class JsonCoder(Coder):
@classmethod
def encode(cls, value: Any) -> bytes:
if isinstance(value, JSONResponse):
return value.body
return json.dumps(value, cls=JsonEncoder).encode()
@classmethod
def decode(cls, value: bytes) -> Any:
# explicitly decode from UTF-8 bytes first, as otherwise
# json.loads() will first have to detect the correct UTF-
# encoding used.
return json.loads(value.decode(), object_hook=object_hook)
class PickleCoder(Coder):
@classmethod
def encode(cls, value: Any) -> bytes:
if isinstance(value, TemplateResponse):
value = value.body
return pickle.dumps(value)
@classmethod
def decode(cls, value: bytes) -> Any:
return pickle.loads(value) # noqa: S301
@classmethod
def decode_as_type(cls, value: bytes, *, type_: Optional[_T]) -> Any:
# Pickle already produces the correct type on decoding, no point
# in paying an extra performance penalty for pydantic to discover
# the same.
return cls.decode(value)