Skip to content

Commit 5ee05ec

Browse files
committed
Add default timeout
1 parent 8bd0770 commit 5ee05ec

3 files changed

Lines changed: 27 additions & 9 deletions

File tree

src/asyncapi_python/kernel/endpoint/abc.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ class EndpointParams(TypedDict, total=False):
1414
"""Optional parameters for endpoint configuration"""
1515

1616
service_name: str # Service name for generating app_id
17+
default_rpc_timeout: (
18+
float | None
19+
) # Default timeout in seconds for RPC client requests (default: 180.0), or None to disable
1720
disable_handler_validation: bool # Opt-out of handler enforcement for testing
1821

1922

src/asyncapi_python/kernel/endpoint/rpc_client.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import Generic
33
from uuid import uuid4
44

5-
from typing_extensions import Unpack
5+
from typing_extensions import NotRequired, Unpack
66

77
from asyncapi_python.kernel.wire import Producer
88

@@ -21,6 +21,13 @@ class RpcClient(AbstractEndpoint, Send[T_Input, T_Output], Generic[T_Input, T_Ou
2121
a single reply consumer and background task for efficiency.
2222
"""
2323

24+
class RouterInputs(Send.RouterInputs):
25+
"""Router inputs for RPC client, extending Send.RouterInputs with timeout"""
26+
27+
timeout: NotRequired[
28+
float | None
29+
] # Timeout in seconds for this RPC request, or None to disable timeout
30+
2431
def __init__(self, **kwargs: Unpack[AbstractEndpoint.Inputs]):
2532
super().__init__(**kwargs)
2633
# Instance-specific state
@@ -76,18 +83,17 @@ async def stop(self) -> None:
7683
if remaining_count == 0:
7784
await global_reply_handler.cleanup_if_last_instance()
7885

79-
async def __call__(
80-
self,
81-
payload: T_Input,
82-
/,
83-
timeout: float = 30.0,
84-
**kwargs: Unpack[Send.RouterInputs],
86+
async def __call__( # type: ignore[override]
87+
self, payload: T_Input, /, **kwargs: Unpack[RouterInputs]
8588
) -> T_Output:
8689
"""Send an RPC request and wait for response using global reply handling
8790
8891
Args:
8992
payload: The request payload to send
90-
timeout: Maximum time to wait for response (default 30 seconds)
93+
**kwargs: Router inputs including optional timeout:
94+
- Not provided: uses default_rpc_timeout from endpoint_params (default: 180.0)
95+
- float: uses the specified timeout in seconds
96+
- None: disables timeout (waits indefinitely)
9197
9298
Returns:
9399
The response payload
@@ -99,6 +105,14 @@ async def __call__(
99105
if not self._producer:
100106
raise UninitializedError()
101107

108+
# Determine timeout: use provided value, or fall back to endpoint_params default
109+
if "timeout" in kwargs:
110+
# Explicitly provided (could be float or None)
111+
timeout = kwargs["timeout"]
112+
else:
113+
# Not provided, use default from endpoint_params
114+
timeout = self._endpoint_params.get("default_rpc_timeout", 180.0)
115+
102116
# Generate correlation ID for this request
103117
correlation_id: str = str(uuid4())
104118

src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
from typing import Any
66

77
from asyncapi_python.kernel.document import Channel, Operation
8-
from asyncapi_python.kernel.wire import AbstractWireFactory, Consumer, EndpointParams
8+
from asyncapi_python.kernel.wire import AbstractWireFactory, Consumer
99

1010
from ..typing import IncomingMessage
11+
from .abc import EndpointParams
1112

1213

1314
class GlobalRpcReplyHandler:

0 commit comments

Comments
 (0)