Skip to content

Commit 8bd0770

Browse files
committed
Move service name to endpoint params
1 parent b3f4ffa commit 8bd0770

6 files changed

Lines changed: 22 additions & 21 deletions

File tree

src/asyncapi_python/kernel/application.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,22 @@ class BaseApplication:
1515
class Inputs(TypedDict):
1616
wire_factory: Required[AbstractWireFactory[Any, Any]]
1717
codec_factory: Required[CodecFactory[Any, Any]]
18-
service_name: NotRequired[str]
1918
endpoint_params: NotRequired[EndpointParams]
2019

2120
def __init__(self, **kwargs: Unpack[Inputs]) -> None:
2221
self.__endpoints: set[AbstractEndpoint] = set()
2322
self.__wire_factory: AbstractWireFactory[Any, Any] = kwargs["wire_factory"]
2423
self.__codec_factory: CodecFactory[Any, Any] = kwargs["codec_factory"]
25-
self.__service_name: str = kwargs.get("service_name", "app")
2624
self.__endpoint_params: EndpointParams = kwargs.get("endpoint_params", {})
2725
self._stop_event: asyncio.Event | None = None
2826
self._monitor_task: asyncio.Task[None] | None = None
2927
self._exception_future: asyncio.Future[Exception] | None = None
3028

31-
@property
32-
def service_name(self) -> str:
33-
"""Get the service name for this application"""
34-
return self.__service_name
35-
3629
def _register_endpoint(self, op: Operation) -> AbstractEndpoint:
3730
endpoint = EndpointFactory.create(
3831
operation=op,
3932
wire_factory=self.__wire_factory,
4033
codec_factory=self.__codec_factory,
41-
service_name=self.__service_name,
4234
endpoint_params=self.__endpoint_params,
4335
)
4436
self.__endpoints.add(endpoint)

src/asyncapi_python/kernel/endpoint/abc.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,11 @@
1010
from ..typing import BatchConfig, Handler, T_Input, T_Output
1111

1212

13-
class EndpointParams(TypedDict):
13+
class EndpointParams(TypedDict, total=False):
1414
"""Optional parameters for endpoint configuration"""
1515

16-
disable_handler_validation: NotRequired[
17-
bool
18-
] # Opt-out of handler enforcement for testing
16+
service_name: str # Service name for generating app_id
17+
disable_handler_validation: bool # Opt-out of handler enforcement for testing
1918

2019

2120
class HandlerParams(TypedDict):
@@ -31,7 +30,6 @@ class Inputs(TypedDict):
3130
operation: Required[Operation]
3231
wire_factory: Required[AbstractWireFactory[Any, Any]]
3332
codec_factory: Required[CodecFactory[Any, Any]]
34-
service_name: NotRequired[str] # Service name for app_id generation
3533
endpoint_params: NotRequired[EndpointParams] # Optional endpoint configuration
3634

3735
class StartParams(TypedDict):
@@ -43,7 +41,6 @@ class StartParams(TypedDict):
4341
def __init__(self, **kwargs: Unpack[Inputs]):
4442
self._operation = kwargs["operation"]
4543
self._wire = kwargs["wire_factory"]
46-
self._service_name = kwargs.get("service_name", "app")
4744
codec_factory = kwargs["codec_factory"]
4845
# Endpoint sets its own defaults - empty dict if not provided
4946
self._endpoint_params = kwargs.get("endpoint_params", {})

src/asyncapi_python/kernel/endpoint/rpc_client.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,19 @@ async def start(self, **params: Unpack[AbstractEndpoint.StartParams]) -> None:
4545

4646
# Ensure global reply handling is set up (only happens once)
4747
await global_reply_handler.ensure_reply_handler(
48-
self._wire, self._operation, self._service_name
48+
self._wire, self._operation, self._endpoint_params
4949
)
5050

51+
# Extract service_name from endpoint_params for app_id
52+
service_name = self._endpoint_params.get("service_name", "app")
53+
5154
# Create instance-specific producer for sending requests
5255
self._producer = await self._wire.create_producer(
5356
channel=self._operation.channel,
5457
parameters={},
5558
op_bindings=self._operation.bindings,
5659
is_reply=False,
57-
app_id=self._service_name,
60+
app_id=service_name,
5861
)
5962

6063
# Start producer

src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import Any
66

77
from asyncapi_python.kernel.document import Channel, Operation
8-
from asyncapi_python.kernel.wire import AbstractWireFactory, Consumer
8+
from asyncapi_python.kernel.wire import AbstractWireFactory, Consumer, EndpointParams
99

1010
from ..typing import IncomingMessage
1111

@@ -29,16 +29,19 @@ async def ensure_reply_handler(
2929
self,
3030
wire_factory: AbstractWireFactory[Any, Any],
3131
operation: Operation,
32-
service_name: str = "app",
32+
endpoint_params: EndpointParams,
3333
) -> None:
3434
"""Ensure reply consumer and task are running
3535
3636
Args:
3737
wire_factory: Wire factory for creating consumer
3838
operation: Operation definition
39-
service_name: Service name for generating consistent app_id
39+
endpoint_params: Endpoint parameters including service_name
4040
"""
4141
if self._reply_consumer is None:
42+
# Extract service_name from endpoint_params
43+
service_name = endpoint_params.get("service_name", "app")
44+
4245
# Generate app_id with service name + random hex (same format as AmqpWire)
4346
random_hex = secrets.token_hex(4) # 4 bytes = 8 hex chars
4447
app_id = f"{service_name}-{random_hex}"

tests/integration/scenarios/fan_in_logging.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@ def __init__(
2727
wire_factory: AbstractWireFactory,
2828
codec_factory: CodecFactory,
2929
):
30+
# Pass service_name via endpoint_params
31+
endpoint_params = {"service_name": service_name}
3032
super().__init__(
3133
wire_factory=wire_factory,
3234
codec_factory=codec_factory,
33-
service_name=service_name,
35+
endpoint_params=endpoint_params,
3436
)
37+
self.service_name = service_name # Store for use in _setup_endpoints
3538
self._setup_endpoints()
3639

3740
def _setup_endpoints(self):

tests/integration/scenarios/fan_out_broadcasting.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,14 @@ def __init__(
108108
wire_factory: AbstractWireFactory,
109109
codec_factory: CodecFactory,
110110
):
111+
# Pass service_name via endpoint_params
112+
endpoint_params = {"service_name": service_name}
111113
super().__init__(
112114
wire_factory=wire_factory,
113115
codec_factory=codec_factory,
114-
service_name=service_name,
116+
endpoint_params=endpoint_params,
115117
)
118+
self.service_name = service_name # Store for use in _setup_endpoints
116119
self._setup_endpoints()
117120

118121
def _setup_endpoints(self):

0 commit comments

Comments
 (0)