-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathapplication.py.j2
More file actions
76 lines (63 loc) · 2.92 KB
/
application.py.j2
File metadata and controls
76 lines (63 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
"""Generated AsyncAPI application."""
from __future__ import annotations
from typing import Any
from asyncapi_python.kernel.application import BaseApplication
from asyncapi_python.kernel.wire import AbstractWireFactory
from asyncapi_python.kernel.codec import CodecFactory
from asyncapi_python.contrib.codec.registry import CodecRegistry
from asyncapi_python.kernel.endpoint import AbstractEndpoint
from asyncapi_python.kernel.endpoint.abc import EndpointParams
from .router import ProducerRouter, ConsumerRouter
import sys
class Application(BaseApplication):
"""{{ app_title }} - {{ app_description }}
AsyncAPI Version: {{ asyncapi_version }}
Application Version: {{ app_version }}
"""
def __init__(
self,
wire_factory: AbstractWireFactory[Any, Any],
*,
endpoint_params: EndpointParams | None = None,
):
"""Initialize the AsyncAPI application.
Args:
wire_factory: Wire protocol factory for message transport
endpoint_params: Optional endpoint configuration (service_name, default_rpc_timeout, etc.)
"""
# Use CodecRegistry with current module for message serialization
current_module = sys.modules[self.__module__.rsplit(".", 1)[0]]
codec_factory = CodecRegistry(current_module)
# Pass endpoint_params to BaseApplication if provided
if endpoint_params is not None:
super().__init__(
wire_factory=wire_factory,
codec_factory=codec_factory,
endpoint_params=endpoint_params,
)
else:
super().__init__(wire_factory=wire_factory, codec_factory=codec_factory)
# Initialize semantic routers with factories
self.producer = ProducerRouter(wire_factory, codec_factory)
self.consumer = ConsumerRouter(wire_factory, codec_factory)
# Register all endpoints from routers
self._register_router_endpoints(self.producer)
self._register_router_endpoints(self.consumer)
def _register_router_endpoints(self, router: object) -> None:
"""Recursively register all endpoints from router tree.
Args:
router: Router object to scan for endpoints
"""
if isinstance(router, AbstractEndpoint):
# This router is an endpoint - register it directly
self._add_endpoint(router)
elif hasattr(router, "__dict__"):
# This router aggregates others - recurse through attributes
for attr_name in dir(router):
if not attr_name.startswith("_"):
attr = getattr(router, attr_name, None)
# Check if it's a router-like object (has __dict__ or is an endpoint)
if attr is not None and (
isinstance(attr, AbstractEndpoint) or hasattr(attr, "__dict__")
):
self._register_router_endpoints(attr)