-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathairbyte_protocol_serializers.py
More file actions
56 lines (46 loc) · 2.12 KB
/
airbyte_protocol_serializers.py
File metadata and controls
56 lines (46 loc) · 2.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
import sys
from typing import TYPE_CHECKING, Any, Dict
if TYPE_CHECKING:
from serpyco_rs import CustomType, Serializer
else:
USE_RUST_BACKEND = sys.platform != "emscripten"
if USE_RUST_BACKEND:
from serpyco_rs import CustomType, Serializer
else:
from serpyco import CustomType, Serializer
from .airbyte_protocol import ( # type: ignore[attr-defined] # all classes are imported to airbyte_protocol via *
AirbyteCatalog,
AirbyteMessage,
AirbyteStateBlob,
AirbyteStateMessage,
AirbyteStream,
AirbyteStreamState,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
ConnectorSpecification,
)
class AirbyteStateBlobType(CustomType[AirbyteStateBlob, Dict[str, Any]]):
def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]:
# cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete"
return {k: v for k, v in value.__dict__.items()}
def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob:
return AirbyteStateBlob(value)
def get_json_schema(self) -> Dict[str, Any]:
return {"type": "object"}
def custom_type_resolver(t: type) -> CustomType[AirbyteStateBlob, Dict[str, Any]] | None:
return AirbyteStateBlobType() if t is AirbyteStateBlob else None
AirbyteCatalogSerializer = Serializer(AirbyteCatalog, omit_none=True)
AirbyteStreamSerializer = Serializer(AirbyteStream, omit_none=True)
AirbyteStreamStateSerializer = Serializer(
AirbyteStreamState, omit_none=True, custom_type_resolver=custom_type_resolver
)
AirbyteStateMessageSerializer = Serializer(
AirbyteStateMessage, omit_none=True, custom_type_resolver=custom_type_resolver
)
AirbyteMessageSerializer = Serializer(
AirbyteMessage, omit_none=True, custom_type_resolver=custom_type_resolver
)
ConfiguredAirbyteCatalogSerializer = Serializer(ConfiguredAirbyteCatalog, omit_none=True)
ConfiguredAirbyteStreamSerializer = Serializer(ConfiguredAirbyteStream, omit_none=True)
ConnectorSpecificationSerializer = Serializer(ConnectorSpecification, omit_none=True)