-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcodec.py
More file actions
35 lines (24 loc) · 1.23 KB
/
codec.py
File metadata and controls
35 lines (24 loc) · 1.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from abc import ABC, abstractmethod
from types import ModuleType
from typing import Generic, Protocol
from asyncapi_python.kernel.document.message import Message
from .typing import T_DecodedPayload, T_EncodedPayload
class Codec(Protocol, Generic[T_DecodedPayload, T_EncodedPayload]):
def encode(self, payload: T_DecodedPayload) -> T_EncodedPayload: ...
def decode(self, payload: T_EncodedPayload) -> T_DecodedPayload: ...
class CodecFactory(ABC, Generic[T_DecodedPayload, T_EncodedPayload]):
"""A codec factory
Args:
module (ModuleType): a root module where the generated code of the application lies.
Notes:
This essentially couples codec factory with the corresponding compiler (options).
All assumptions regarding message type positioning must be clearly documented.
"""
def __init__(self, module: ModuleType):
self._module = module
@abstractmethod
def create(self, message: Message) -> Codec[T_DecodedPayload, T_EncodedPayload]:
"""Creates codec instance from the message spec.
The factory will dynamically import data model object based on the root module and the
code generated, and will construct a codec implementation for this message.
"""