Skip to content

Commit eb13b3d

Browse files
authored
Replaced async functions with actual returned Future types. (#35)
1 parent c86ca99 commit eb13b3d

File tree

8 files changed

+139
-131
lines changed

8 files changed

+139
-131
lines changed

python/natsrpy/_natsrpy_rs/__init__.pyi

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from asyncio import Future
12
from collections.abc import Awaitable, Callable
23
from datetime import timedelta
34
from typing import Any, final, overload
@@ -42,8 +43,8 @@ class IteratorSubscription:
4243
"""
4344

4445
def __aiter__(self) -> IteratorSubscription: ...
45-
async def __anext__(self) -> Message: ...
46-
async def next(self, timeout: float | timedelta | None = None) -> Message:
46+
def __anext__(self) -> Future[Message]: ...
47+
def next(self, timeout: float | timedelta | None = None) -> Future[Message]:
4748
"""Receive the next message from the subscription.
4849
4950
:param timeout: maximum time to wait for a message in seconds
@@ -53,14 +54,14 @@ class IteratorSubscription:
5354
unsubscribed.
5455
"""
5556

56-
async def unsubscribe(self, limit: int | None = None) -> None:
57+
def unsubscribe(self, limit: int | None = None) -> Future[None]:
5758
"""Unsubscribe from the subject.
5859
5960
:param limit: if set, automatically unsubscribe after receiving
6061
this many additional messages, defaults to None.
6162
"""
6263

63-
async def drain(self) -> None:
64+
def drain(self) -> Future[None]:
6465
"""Drain the subscription.
6566
6667
Unsubscribes and flushes any remaining messages before closing.
@@ -74,14 +75,14 @@ class CallbackSubscription:
7475
Messages are automatically delivered to the callback in a background task.
7576
"""
7677

77-
async def unsubscribe(self, limit: int | None = None) -> None:
78+
def unsubscribe(self, limit: int | None = None) -> Future[None]:
7879
"""Unsubscribe from the subject.
7980
8081
:param limit: if set, automatically unsubscribe after receiving
8182
this many additional messages, defaults to None.
8283
"""
8384

84-
async def drain(self) -> None:
85+
def drain(self) -> Future[None]:
8586
"""Drain the subscription.
8687
8788
Unsubscribes and flushes any remaining messages before closing.
@@ -132,30 +133,30 @@ class Nats:
132133
in seconds or as a timedelta, defaults to 10 seconds.
133134
"""
134135

135-
async def startup(self) -> None:
136+
def startup(self) -> Future[None]:
136137
"""Connect to the NATS server.
137138
138139
Establishes the connection using the parameters provided at
139140
construction time. Must be called before any publish, subscribe,
140141
or JetStream operations.
141142
"""
142143

143-
async def shutdown(self) -> None:
144+
def shutdown(self) -> Future[None]:
144145
"""Close the NATS connection.
145146
146147
Drains all subscriptions and flushes pending data before
147148
disconnecting.
148149
"""
149150

150-
async def publish(
151+
def publish(
151152
self,
152153
subject: str,
153154
payload: bytes | str | bytearray | memoryview,
154155
*,
155156
headers: dict[str, Any] | None = None,
156157
reply: str | None = None,
157158
err_on_disconnect: bool = False,
158-
) -> None:
159+
) -> Future[None]:
159160
"""Publish a message to a subject.
160161
161162
:param subject: subject to publish the message to.
@@ -167,15 +168,15 @@ class Nats:
167168
is disconnected, defaults to False.
168169
"""
169170

170-
async def request(
171+
def request(
171172
self,
172173
subject: str,
173174
payload: bytes | str | bytearray | memoryview,
174175
*,
175176
headers: dict[str, Any] | None = None,
176177
inbox: str | None = None,
177178
timeout: float | timedelta | None = None,
178-
) -> Message:
179+
) -> Future[Message]:
179180
"""Send a request and discard the response.
180181
181182
:param subject: subject to send the request to.
@@ -188,31 +189,31 @@ class Nats:
188189
:return: response message.
189190
"""
190191

191-
async def drain(self) -> None:
192+
def drain(self) -> Future[None]:
192193
"""Drain the connection.
193194
194195
Gracefully closes all subscriptions and flushes pending messages.
195196
"""
196197

197-
async def flush(self) -> None:
198+
def flush(self) -> Future[None]:
198199
"""Flush the connection.
199200
200201
Waits until all pending messages have been sent to the server.
201202
"""
202203

203204
@overload
204-
async def subscribe(
205+
def subscribe(
205206
self,
206207
subject: str,
207208
callback: Callable[[Message], Awaitable[None]],
208-
) -> CallbackSubscription: ...
209+
) -> Future[CallbackSubscription]: ...
209210
@overload
210-
async def subscribe(
211+
def subscribe(
211212
self,
212213
subject: str,
213214
callback: None = None,
214-
) -> IteratorSubscription: ...
215-
async def jetstream(
215+
) -> Future[IteratorSubscription]: ...
216+
def jetstream(
216217
self,
217218
*,
218219
domain: str | None = None,
@@ -222,7 +223,7 @@ class Nats:
222223
concurrency_limit: int | None = None,
223224
max_ack_inflight: int | None = None,
224225
backpressure_on_inflight: bool | None = None,
225-
) -> js.JetStream:
226+
) -> Future[js.JetStream]:
226227
"""Create a JetStream context.
227228
228229
:param domain: JetStream domain to use.

python/natsrpy/_natsrpy_rs/js/__init__.pyi

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from asyncio import Future
12
from datetime import datetime, timedelta
23
from typing import Any, Literal, final, overload
34

@@ -48,35 +49,35 @@ class JetStream:
4849
"""
4950

5051
@overload
51-
async def publish(
52+
def publish(
5253
self,
5354
subject: str,
5455
payload: str | bytes | bytearray | memoryview,
5556
*,
5657
headers: dict[str, str] | None = None,
5758
err_on_disconnect: bool = False,
5859
wait: Literal[True],
59-
) -> Publication: ...
60+
) -> Future[Publication]: ...
6061
@overload
61-
async def publish(
62+
def publish(
6263
self,
6364
subject: str,
6465
payload: str | bytes | bytearray | memoryview,
6566
*,
6667
headers: dict[str, str] | None = None,
6768
err_on_disconnect: bool = False,
6869
wait: Literal[False] = False,
69-
) -> None: ...
70+
) -> Future[None]: ...
7071
@overload
71-
async def publish(
72+
def publish(
7273
self,
7374
subject: str,
7475
payload: str | bytes | bytearray | memoryview,
7576
*,
7677
headers: dict[str, str] | None = None,
7778
err_on_disconnect: bool = False,
7879
wait: bool = False,
79-
) -> Publication | None: ...
80+
) -> Future[Publication | None]: ...
8081
@property
8182
def kv(self) -> KVManager:
8283
"""Manager for key-value store buckets."""
@@ -158,17 +159,17 @@ class JetStreamMessage:
158159
def token(self) -> str | None:
159160
"""Authentication token, if applicable."""
160161

161-
async def ack(self, double: bool = False) -> None:
162+
def ack(self, double: bool = False) -> Future[None]:
162163
"""Acknowledge that a message was handled.
163164
164165
:param double: whether to wait for server response, defaults to False.
165166
"""
166167

167-
async def nack(
168+
def nack(
168169
self,
169170
delay: float | timedelta | None = None,
170171
double: bool = False,
171-
) -> None:
172+
) -> Future[None]:
172173
"""Negative acknowledgement.
173174
174175
Signals that the message will not be processed now
@@ -179,7 +180,7 @@ class JetStreamMessage:
179180
:param double: whether to wait for server response, defaults to False.
180181
"""
181182

182-
async def progress(self, double: bool = False) -> None:
183+
def progress(self, double: bool = False) -> Future[None]:
183184
"""Progress acknowledgement.
184185
185186
Signals that the message is being handled right now.
@@ -189,7 +190,7 @@ class JetStreamMessage:
189190
:param double: whether to wait for server response, defaults to False.
190191
"""
191192

192-
async def next(self, double: bool = False) -> None:
193+
def next(self, double: bool = False) -> Future[None]:
193194
"""Next acknowledgement.
194195
195196
Only applies to pull consumers.
@@ -199,7 +200,7 @@ class JetStreamMessage:
199200
:param double: whether to wait for server response, defaults to False.
200201
"""
201202

202-
async def term(self, double: bool = False) -> None:
203+
def term(self, double: bool = False) -> Future[None]:
203204
"""Term acknowledgement.
204205
205206
Instructs server to stop redelivering message.

python/natsrpy/_natsrpy_rs/js/consumers.pyi

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from asyncio import Future
12
from datetime import timedelta
23
from typing import final
34

@@ -280,11 +281,11 @@ class MessagesIterator:
280281
"""Async iterator over JetStream consumer messages."""
281282

282283
def __aiter__(self) -> Self: ...
283-
async def __anext__(self) -> JetStreamMessage: ...
284-
async def next(
284+
def __anext__(self) -> Future[JetStreamMessage]: ...
285+
def next(
285286
self,
286287
timeout: float | timedelta | None = None,
287-
) -> JetStreamMessage:
288+
) -> Future[JetStreamMessage]:
288289
"""Receive the next message from the consumer.
289290
290291
:param timeout: maximum time to wait in seconds or as a timedelta,
@@ -299,7 +300,7 @@ class PushConsumer:
299300
Messages are delivered by the server to a specified subject.
300301
"""
301302

302-
async def messages(self) -> MessagesIterator:
303+
def messages(self) -> Future[MessagesIterator]:
303304
"""Get an async iterator for consuming messages.
304305
305306
:return: an async iterator over JetStream messages.
@@ -312,7 +313,7 @@ class PullConsumer:
312313
Messages are fetched on demand in batches by the client.
313314
"""
314315

315-
async def fetch(
316+
def fetch(
316317
self,
317318
max_messages: int | None = None,
318319
group: str | None = None,
@@ -323,7 +324,7 @@ class PullConsumer:
323324
min_pending: int | None = None,
324325
min_ack_pending: int | None = None,
325326
timeout: float | timedelta | None = None,
326-
) -> list[JetStreamMessage]:
327+
) -> Future[list[JetStreamMessage]]:
327328
"""Fetch a batch of messages from the consumer.
328329
329330
:param max_messages: maximum number of messages to fetch.

python/natsrpy/_natsrpy_rs/js/counters.pyi

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from asyncio import Future
12
from datetime import timedelta
23
from typing import final
34

@@ -183,12 +184,12 @@ class Counters:
183184
``allow_message_counter`` enabled.
184185
"""
185186

186-
async def add(
187+
def add(
187188
self,
188189
key: str,
189190
value: int,
190191
timeout: float | timedelta | None = None,
191-
) -> int:
192+
) -> Future[int]:
192193
"""Add an arbitrary value to a counter.
193194
194195
:param key: subject key identifying the counter.
@@ -198,11 +199,11 @@ class Counters:
198199
:return: the new counter value after the addition.
199200
"""
200201

201-
async def incr(
202+
def incr(
202203
self,
203204
key: str,
204205
timeout: float | timedelta | None = None,
205-
) -> int:
206+
) -> Future[int]:
206207
"""Increment a counter by one.
207208
208209
Shorthand for ``add(key, 1)``.
@@ -213,11 +214,11 @@ class Counters:
213214
:return: the new counter value after the increment.
214215
"""
215216

216-
async def decr(
217+
def decr(
217218
self,
218219
key: str,
219220
timeout: float | timedelta | None = None,
220-
) -> int:
221+
) -> Future[int]:
221222
"""Decrement a counter by one.
222223
223224
Shorthand for ``add(key, -1)``.
@@ -228,11 +229,11 @@ class Counters:
228229
:return: the new counter value after the decrement.
229230
"""
230231

231-
async def get(
232+
def get(
232233
self,
233234
key: str,
234235
timeout: float | timedelta | None = None,
235-
) -> CounterEntry:
236+
) -> Future[CounterEntry]:
236237
"""Retrieve the current value of a counter.
237238
238239
:param key: subject key identifying the counter.

0 commit comments

Comments
 (0)