Skip to content

Commit b7cf40f

Browse files
committed
Move service_name to Application from wire
1 parent 66dcce7 commit b7cf40f

51 files changed

Lines changed: 312 additions & 200 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/asyncapi_python/contrib/codec/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,4 @@
22

33
from .registry import CodecRegistry
44

5-
65
__all__ = ["CodecRegistry"]

src/asyncapi_python/contrib/codec/json.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import json
2-
from typing import Type, ClassVar
32
from types import ModuleType
3+
from typing import ClassVar, Type
44

55
from pydantic import BaseModel, ValidationError
66

src/asyncapi_python/contrib/codec/registry.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
from typing import ClassVar, Any
21
from types import ModuleType
3-
from asyncapi_python.kernel.codec import CodecFactory, Codec
2+
from typing import Any, ClassVar
3+
4+
from asyncapi_python.kernel.codec import Codec, CodecFactory
45
from asyncapi_python.kernel.document.message import Message
6+
57
from .json import JsonCodecFactory
68

79

src/asyncapi_python/contrib/wire/amqp/consumer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
try:
77
from aio_pika import ExchangeType # type: ignore[import-not-found]
88
from aio_pika.abc import ( # type: ignore[import-not-found]
9-
AbstractConnection,
109
AbstractChannel,
11-
AbstractQueue,
10+
AbstractConnection,
1211
AbstractExchange,
12+
AbstractQueue,
1313
)
1414
except ImportError as e:
1515
raise ImportError(

src/asyncapi_python/contrib/wire/amqp/factory.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
"""AMQP wire factory implementation"""
22

33
import secrets
4-
from typing import Optional, Callable, Any, cast
4+
from typing import Any, Callable, Optional, cast
5+
56
from typing_extensions import Unpack
67

78
try:
@@ -13,11 +14,11 @@
1314
) from e
1415

1516
from asyncapi_python.kernel.wire import AbstractWireFactory, EndpointParams
16-
from asyncapi_python.kernel.wire.typing import Producer, Consumer
17+
from asyncapi_python.kernel.wire.typing import Consumer, Producer
1718

18-
from .message import AmqpWireMessage, AmqpIncomingMessage
19-
from .producer import AmqpProducer
2019
from .consumer import AmqpConsumer
20+
from .message import AmqpIncomingMessage, AmqpWireMessage
21+
from .producer import AmqpProducer
2122
from .resolver import resolve_amqp_config
2223

2324

@@ -135,8 +136,12 @@ async def create_consumer(
135136
# Generate operation name from available information
136137
operation_name = self._generate_operation_name(kwargs)
137138

139+
# Use provided app_id if available, otherwise use instance app_id
140+
# This allows application-level control over queue naming
141+
app_id = kwargs.get("app_id", self._app_id)
142+
138143
# Resolve AMQP configuration using pattern matching
139-
config = resolve_amqp_config(kwargs, operation_name, self._app_id)
144+
config = resolve_amqp_config(kwargs, operation_name, app_id)
140145

141146
connection = await self._get_connection()
142147

@@ -154,8 +159,12 @@ async def create_producer(
154159
# Generate operation name from available information
155160
operation_name = self._generate_operation_name(kwargs)
156161

162+
# Use provided app_id if available, otherwise use instance app_id
163+
# This allows application-level control over queue naming
164+
app_id = kwargs.get("app_id", self._app_id)
165+
157166
# Resolve AMQP configuration using pattern matching
158-
config = resolve_amqp_config(kwargs, operation_name, self._app_id)
167+
config = resolve_amqp_config(kwargs, operation_name, app_id)
159168

160169
connection = await self._get_connection()
161170

src/asyncapi_python/contrib/wire/amqp/resolver.py

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
"""Binding resolution with comprehensive pattern matching"""
22

33
from typing import Any
4-
from asyncapi_python.kernel.wire import EndpointParams
5-
from asyncapi_python.kernel.document.channel import Channel
4+
65
from asyncapi_python.kernel.document.bindings import AmqpChannelBinding
6+
from asyncapi_python.kernel.document.channel import Channel
7+
from asyncapi_python.kernel.wire import EndpointParams
78

8-
from .config import AmqpConfig, AmqpBindingType
9-
from .utils import validate_parameters_strict, substitute_parameters
9+
from .config import AmqpBindingType, AmqpConfig
10+
from .utils import substitute_parameters, validate_parameters_strict
1011

1112

1213
def resolve_amqp_config(
@@ -57,17 +58,32 @@ def resolve_amqp_config(
5758
},
5859
)
5960

60-
# Reply channel with explicit address - shared channel with filtering
61+
# Reply channel with explicit address - check if direct queue or topic exchange
6162
case (True, _, address, _) if address:
6263
resolved_address = substitute_parameters(address, param_values)
63-
return AmqpConfig(
64-
queue_name=f"reply-{app_id}", # App-specific reply queue
65-
exchange_name=resolved_address, # Shared exchange for replies
66-
exchange_type="topic", # Enable pattern matching for filtering
67-
routing_key=app_id, # Filter messages by app_id
68-
binding_type=AmqpBindingType.REPLY,
69-
queue_properties={"durable": True, "exclusive": False},
70-
)
64+
# If address starts with "reply-", treat it as a direct queue name (RPC pattern)
65+
if resolved_address.startswith("reply-"):
66+
return AmqpConfig(
67+
queue_name=resolved_address, # Use address as queue name
68+
exchange_name="", # Default exchange for direct routing
69+
routing_key=resolved_address, # Route directly to queue
70+
binding_type=AmqpBindingType.REPLY,
71+
queue_properties={
72+
"durable": False,
73+
"exclusive": True,
74+
"auto_delete": True,
75+
},
76+
)
77+
else:
78+
# Topic-based reply pattern - shared exchange with filtering
79+
return AmqpConfig(
80+
queue_name=f"reply-{app_id}", # App-specific reply queue
81+
exchange_name=resolved_address, # Shared exchange for replies
82+
exchange_type="topic", # Enable pattern matching for filtering
83+
routing_key=app_id, # Filter messages by app_id
84+
binding_type=AmqpBindingType.REPLY,
85+
queue_properties={"durable": True, "exclusive": False},
86+
)
7187

7288
# Reply channel with binding - defer to binding resolution
7389
case (True, binding, _, _) if binding and binding.type == "queue":

src/asyncapi_python/contrib/wire/amqp/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# TODO: This thing should be general wire utils, not tied to specific wire
44

55
import re
6+
67
from asyncapi_python.kernel.document.channel import Channel
78

89

src/asyncapi_python/kernel/application.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,44 @@
11
import asyncio
2-
from typing import TypedDict, Any
3-
from typing_extensions import Unpack, Required, NotRequired
2+
from typing import Any, TypedDict
3+
4+
from typing_extensions import NotRequired, Required, Unpack
45

56
from asyncapi_python.kernel.document.operation import Operation
67
from asyncapi_python.kernel.wire import AbstractWireFactory
8+
9+
from .codec import CodecFactory
710
from .endpoint import AbstractEndpoint, EndpointFactory
811
from .endpoint.abc import EndpointParams
9-
from .codec import CodecFactory
1012

1113

1214
class BaseApplication:
1315
class Inputs(TypedDict):
1416
wire_factory: Required[AbstractWireFactory[Any, Any]]
1517
codec_factory: Required[CodecFactory[Any, Any]]
18+
service_name: NotRequired[str]
1619
endpoint_params: NotRequired[EndpointParams]
1720

1821
def __init__(self, **kwargs: Unpack[Inputs]) -> None:
1922
self.__endpoints: set[AbstractEndpoint] = set()
2023
self.__wire_factory: AbstractWireFactory[Any, Any] = kwargs["wire_factory"]
2124
self.__codec_factory: CodecFactory[Any, Any] = kwargs["codec_factory"]
25+
self.__service_name: str = kwargs.get("service_name", "app")
2226
self.__endpoint_params: EndpointParams = kwargs.get("endpoint_params", {})
2327
self._stop_event: asyncio.Event | None = None
2428
self._monitor_task: asyncio.Task[None] | None = None
2529
self._exception_future: asyncio.Future[Exception] | None = None
2630

31+
@property
32+
def service_name(self) -> str:
33+
"""Get the service name for this application"""
34+
return self.__service_name
35+
2736
def _register_endpoint(self, op: Operation) -> AbstractEndpoint:
2837
endpoint = EndpointFactory.create(
2938
operation=op,
3039
wire_factory=self.__wire_factory,
3140
codec_factory=self.__codec_factory,
41+
service_name=self.__service_name,
3242
endpoint_params=self.__endpoint_params,
3343
)
3444
self.__endpoints.add(endpoint)

src/asyncapi_python/kernel/codec.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Generic, Protocol
44

55
from asyncapi_python.kernel.document.message import Message
6+
67
from .typing import T_DecodedPayload, T_EncodedPayload
78

89

src/asyncapi_python/kernel/document/__init__.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
from .bindings import (
2+
AmqpChannelBinding,
3+
AmqpExchange,
4+
AmqpExchangeType,
5+
AmqpOperationBinding,
6+
AmqpQueue,
7+
)
18
from .channel import AddressParameter, Channel, ChannelBindings
29
from .common import ExternalDocs, Server, Tag
310
from .message import (
@@ -15,13 +22,6 @@
1522
OperationTrait,
1623
SecurityScheme,
1724
)
18-
from .bindings import (
19-
AmqpChannelBinding,
20-
AmqpOperationBinding,
21-
AmqpExchange,
22-
AmqpQueue,
23-
AmqpExchangeType,
24-
)
2525

2626
__all__ = [
2727
# channel

0 commit comments

Comments
 (0)