Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ jobs:
- '3.12'
- '3.13'
- '3.14'
- 'pypy-3.10'
- 'pypy-3.11'

steps:
- uses: actions/checkout@v5
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ sock2 = zmq_anyio.Socket(sock2)

async def main():
async with sock1, sock2: # use an async context manager
await sock1.asend(b"Hello").wait() # use `asend` instead of `send`, and await the `.wait()` method
await sock1.asend(b"Hello") # use `asend` instead of `send` and await it
sock1.asend(b", World!") # or don't await it, it's sent in the background
assert await sock2.arecv().wait() == b"Hello" # use `arecv` instead of `recv`, and await the `.wait()` method
assert await sock2.arecv() == b"Hello" # use `arecv` instead of `recv`
future = sock2.arecv() # or get the future and await it later
assert await future.wait() == b", World!"
assert await future == b", World!"

anyio.run(main)
```
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ classifiers = [
]
requires-python = ">= 3.10"
dependencies = [
"anyio >=4.13.0,<5.0.0",
"anyioutils >=0.7.4,<0.8.0",
"anyio >=4.14.0,<5.0.0",
"pyzmq >=27.1.0,<28.0.0",
]

Expand Down
209 changes: 209 additions & 0 deletions src/zmq_anyio/_future.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
from __future__ import annotations

from collections.abc import Generator
from enum import Enum, auto
from typing import Any, Generic, TypeVar

from anyio import Event

T = TypeVar("T")


class Future(Generic[T]):
"""
An awaitable object that works similar to a :class:`asyncio.Future` but
with similar characteristics to a :class:`.TaskHandle`.
"""

class Status(Enum):
"""
The status of a future handle.

.. attribute:: PENDING

The future has not finished yet.
.. attribute:: FINISHED

The future has finished with a return value.
.. attribute:: CANCELLED

The future was cancelled and has finished since.
.. attribute:: FAILED

The future raised an exception.
"""

PENDING = auto()
FINISHED = auto()
CANCELLED = auto()
FAILED = auto()

__slots__ = (
"_cancelled",
"_result_value",
"_finished_event",
"_exception",
"_name",
)
_result_value: T

def __init__(self, *, name: str | None = None) -> None:
self._finished_event = Event()
self._exception: BaseException | None = None
self._cancelled: bool = False
self._name = name

def _check_pending(self) -> None:
"""Shortcut for checking if a Future is pending

:raises FutureAlreadyFinished: if future was already given a result or exception.
:raises FutureCancelled: if future has been cancelled previously.
"""
match self.status:
case Future.Status.PENDING:
return
case Future.Status.FINISHED:
raise FutureAlreadyFinished("future has already finished")
case Future.Status.FAILED:
raise FutureAlreadyFinished("future already failed")
case Future.Status.CANCELLED:
raise FutureCancelled("future was cancelled")

async def wait(self) -> None:
"""
Waits for the future to finish.

This method will attempt to wait for a result or exception
"""
await self._finished_event.wait()

def set_result(self, value: T) -> None:
"""
Send pending result for a `.Future` object

:raises FutureAlreadyFinished: if future was already given a result or exception.
:raises FutureCancelled: if future has been cancelled previously.
"""
self._check_pending()
self._result_value = value
self._finished_event.set()

def set_exception(self, exception: BaseException) -> None:
"""Send exception for a `.Future` object

:raises FutureAlreadyFinished: if future was already given a result or exception.
:raises FutureCancelled: if future has been cancelled previously.
"""
self._check_pending()
self._exception = exception
self._finished_event.set()

def cancel(self) -> None:
"""Cancels a pending `.Future` object

Does nothing if the Future was already finished.
"""
if self.status is Future.Status.PENDING:
self._cancelled = True
self._finished_event.set()

@property
def exception(self) -> BaseException | None:
"""
The exception value of a `.Future`

:raises TaskNotFinished: if future is still pending
:raises FutureCancelled: if future was cancelled
:returns: None if future succeeds with a result sent instead
otherwise this will be an exception
"""

match self.status:
case Future.Status.PENDING:
raise TaskNotFinished("the future has not finished yet")
case Future.Status.FINISHED:
return None
case Future.Status.CANCELLED:
raise FutureCancelled("the future was cancelled")
case Future.Status.FAILED:
return self._exception

@property
def return_value(self) -> T:
"""
The return value of the future.

:raises TaskNotFinished: if the future has not finished yet
:raises FutureCancelled: if the future was cancelled
:raises TaskFailed: if the future raised an exception

"""
match self.status:
case Future.Status.PENDING:
raise TaskNotFinished("the future has not finished yet")
case Future.Status.FINISHED:
return self._result_value
case Future.Status.CANCELLED:
raise FutureCancelled("the future was cancelled")
case Future.Status.FAILED:
raise TaskFailed("the future raised an exception") from self._exception

@property
def status(self) -> Future.Status:
"""
The current status of a future.

Every future starts in the :attr:`~Future.Status.PENDING` state.
If a future is cancelled while in this state, it will transition to the
:attr:`Future.Status.CANCELLING` state. When the task finishes, it will
transition to one of the three final states (
:attr:`Future.Status.FINISHED`, :attr:`Future.Status.FAILED`, or
:attr:`Future.Status.CANCELLING`) depending on the exception the task
raised, if any. No other status transitions will happen.
"""
if not self._finished_event.is_set():
return Future.Status.PENDING
elif self._cancelled:
return Future.Status.CANCELLED
elif self._exception is not None:
return Future.Status.FAILED
else:
return Future.Status.FINISHED

def __await__(self) -> Generator[Any, Any, T]:
yield from self.wait().__await__()
return self.return_value

def __repr__(self) -> str:
return (
f"<{self.__class__.__name__} {self.status.name.lower()} "
f"name={self._name!r}>"
)


class FutureAlreadyFinished(Exception):
"""
Raised when attempting set a result of or await a
:class:`.Future` that has already completed.
"""


class FutureCancelled(Exception):
"""
Raised when attempting to access the return value or exception of a
:class:`.Future` that was cancelled.
"""


class TaskFailed(Exception):
"""
Raised when awaiting on, or attempting to access the return value of, a
:class:`.TaskHandle` that raised an exception.
"""


class TaskNotFinished(Exception):
"""
Raised when attempting to access the return value or exception of a
:class:`.TaskHandle` that is still pending completion.
"""
Loading
Loading