-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathregistry.py
More file actions
97 lines (74 loc) · 3.78 KB
/
registry.py
File metadata and controls
97 lines (74 loc) · 3.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from types import ModuleType
from typing import Any, ClassVar
from asyncapi_python.kernel.codec import Codec, CodecFactory
from asyncapi_python.kernel.document.message import Message
from .json import JsonCodecFactory
class CodecRegistry(CodecFactory[Any, Any]):
"""A registry-based codec factory that routes messages to appropriate codecs by content type.
This factory maintains a class-level registry of codec factories mapped to content types,
and creates codec instances on demand. It supports fallback to a default codec when
no specific codec is registered for a content type.
Example:
>>> # Register codec factories for different content types
>>> CodecRegistry.register("application/json", JsonCodecFactory)
>>> CodecRegistry.register("application/xml", XmlCodecFactory)
>>>
>>> # Create registry instance and use it
>>> registry = CodecRegistry(my_module)
>>> codec = registry.create(json_message) # Returns JSON codec
>>> codec = registry.create(xml_message) # Returns XML codec
"""
_registry: ClassVar[dict[str | None, type[CodecFactory[Any, Any]]]] = {}
"""Class-level registry mapping content types to codec factory classes."""
def __init__(self, module: ModuleType) -> None:
"""Initialize the codec registry.
Args:
module: The root module containing generated message classes.
"""
super().__init__(module)
self._codecs: dict[str | None, CodecFactory[Any, Any]] = {}
@classmethod
def register(
cls, content_type: str | None, codec_factory: type[CodecFactory[Any, Any]], /
) -> None:
"""Register a codec factory for a specific content type.
Args:
content_type: The MIME content type (e.g., "application/json") or None for default.
codec_factory: The codec factory class to use for this content type.
Example:
>>> CodecRegistry.register("application/json", JsonCodecFactory)
>>> CodecRegistry.register(None, JsonCodecFactory) # Default fallback
"""
cls._registry[content_type] = codec_factory
def create(self, message: Message) -> Codec[Any, Any]:
"""Creates codec instance from the message specification.
Looks up the appropriate codec factory based on the message's content type,
creates and caches codec factory instances, then delegates codec creation
to the specific factory.
Args:
message: The AsyncAPI message specification containing content type info.
Returns:
A codec instance capable of encoding/decoding the message.
Raises:
ValueError: If no codec is registered for the message's content type
and no default codec is available.
Example:
>>> message = Message(content_type="application/json", ...)
>>> codec = registry.create(message)
>>> encoded = codec.encode(my_data)
"""
content_type = message.content_type
# Get or create codec instance for this content type
if content_type not in self._codecs:
codec_factory_class = self._registry.get(content_type)
if codec_factory_class is None:
# Fallback to default (None) content type
codec_factory_class = self._registry.get(None)
if codec_factory_class is None:
raise ValueError(
f"No codec registered for content type: {content_type}"
)
self._codecs[content_type] = codec_factory_class(self._module)
return self._codecs[content_type].create(message)
CodecRegistry.register(None, JsonCodecFactory)
CodecRegistry.register("application/json", JsonCodecFactory)