Skip to content

Commit d059683

Browse files
committed
replace NoopSemaphore() with Semaphore(sys.maxsize)
1 parent 08d9f19 commit d059683

5 files changed

Lines changed: 8 additions & 66 deletions

File tree

streamable/_aiterators.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from asyncio.futures import Future
33
from contextlib import suppress
44
import datetime
5+
import sys
56
import time
67
from abc import ABC, abstractmethod
78
from collections import defaultdict, deque
@@ -37,7 +38,7 @@
3738
FIFOFutureResults,
3839
FutureResults,
3940
)
40-
from streamable._tools._async import AsyncFunction, NoopSemaphore, empty_aiter
41+
from streamable._tools._async import AsyncFunction, anext, empty_aiter
4142
from streamable._tools._context import noop_context_manager
4243
from streamable._tools._error import ExceptionContainer, RaisingAsyncIterator
4344

@@ -62,7 +63,7 @@ def __init__(
6263
up_to: Optional[int],
6364
) -> None:
6465
self.iterator = iterator
65-
self.up_to = up_to
66+
self.up_to = up_to or sys.maxsize
6667
self._buffer: "Optional[asyncio.Queue[Union[T, ExceptionContainer]]]" = None
6768
self._slots: Optional[asyncio.Semaphore] = None
6869
self._stopped = False
@@ -76,9 +77,7 @@ def _lazy_buffer(self) -> "asyncio.Queue[Union[T, ExceptionContainer]]":
7677
@property
7778
def _lazy_slots(self) -> asyncio.Semaphore:
7879
if not self._slots:
79-
self._slots = (
80-
asyncio.Semaphore(self.up_to) if self.up_to else NoopSemaphore()
81-
)
80+
self._slots = asyncio.Semaphore(self.up_to)
8281
return self._slots
8382

8483
async def _buffer_upstream(self) -> None:

streamable/_iterators.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import datetime
22
import queue
3+
import sys
34
from threading import Semaphore, Thread
45
import time
56
from abc import ABC, abstractmethod
@@ -27,7 +28,6 @@
2728

2829
from streamable._tools._context import noop_context_manager
2930
from streamable._tools._observation import Observation
30-
from streamable._tools._threading import NoopSemaphore
3131
from streamable._tools._sentinel import STOP_ITERATION
3232
from streamable._tools._validation import validate_sync_flatten_iterable
3333

@@ -59,8 +59,9 @@ def __init__(
5959
up_to: Optional[int],
6060
) -> None:
6161
self.iterator = iterator
62+
up_to = up_to or sys.maxsize
6263
self._buffer: "queue.Queue[Union[T, ExceptionContainer]]" = queue.Queue()
63-
self._slots = Semaphore(up_to) if up_to else NoopSemaphore()
64+
self._slots = Semaphore(up_to)
6465
self._stopped = False
6566

6667
def _buffer_upstream(self) -> None:

streamable/_tools/_async.py

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,4 @@
1-
from asyncio import Semaphore
2-
from typing import (
3-
Any,
4-
AsyncIterator,
5-
Awaitable,
6-
Callable,
7-
Coroutine,
8-
Literal,
9-
TypeVar,
10-
)
1+
from typing import Any, AsyncIterator, Awaitable, Callable, Coroutine, TypeVar
112

123
T = TypeVar("T")
134
R = TypeVar("R")
@@ -27,19 +18,3 @@ async def awaitable_to_coroutine(aw: Awaitable[T]) -> T:
2718
async def empty_aiter() -> AsyncIterator[Any]:
2819
return
2920
yield # pragma: no cover
30-
31-
32-
class NoopSemaphore(Semaphore):
33-
__slots__ = ()
34-
35-
def __init__(self) -> None:
36-
pass
37-
38-
async def acquire(self) -> Literal[True]:
39-
return True
40-
41-
def release(self) -> None:
42-
return
43-
44-
def locked(self) -> bool:
45-
return False

streamable/_tools/_threading.py

Lines changed: 0 additions & 17 deletions
This file was deleted.

tests/test_tools.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import pytest
33
from streamable._tools._func import sidify
44
from streamable._tools._logging import logfmt_str_escape
5-
from streamable._tools import _async, _threading
65

76
from typing import Any, Callable, List
87

@@ -89,18 +88,3 @@ def test_logfmt_str_escape():
8988
assert logfmt_str_escape("in ts") == '"in ts"'
9089
assert logfmt_str_escape("in\\ts") == r'"in\\ts"'
9190
assert logfmt_str_escape('"ints"') == r'"\"ints\""'
92-
93-
94-
def test_noop_semaphore() -> None:
95-
semaphore = _threading.NoopSemaphore()
96-
assert semaphore.acquire()
97-
semaphore.release()
98-
assert not semaphore.locked()
99-
100-
101-
@pytest.mark.asyncio
102-
async def test_noop_semaphore_async() -> None:
103-
semaphore = _async.NoopSemaphore()
104-
assert await semaphore.acquire()
105-
semaphore.release()
106-
assert not semaphore.locked()

0 commit comments

Comments
 (0)