@@ -59,6 +59,10 @@ def __init__(self, api_key, connection_limit, rate_limit, rate_period, version=N
5959 self .rate_limit = rate_limit
6060 self .rate_period = rate_period
6161
62+ async def bound_fetch (self , sem , endpoint , session , throttler ):
63+ async with sem :
64+ return await self .execute (endpoint , session , throttler )
65+
6266 async def endpoint_execute (self , endpoints ):
6367 """Asynchronously calls each endpoint and returns the JSON responses
6468 Args:
@@ -75,9 +79,12 @@ async def endpoint_execute(self, endpoints):
7579
7680 # Asnycio create tasks for each endpoint
7781 try :
78- tasks = (asyncio .create_task (self .execute (endpoint , session , throttler )) for endpoint in endpoints )
79- ret = [await t
80- for t in tqdm .tqdm (self .limited_as_completed (tasks , self .connection_limit ), total = len (endpoints ))]
82+
83+ sem = asyncio .Semaphore (self .connection_limit )
84+ tasks = [asyncio .create_task (self .bound_fetch (sem , endpoint , session , throttler )) for endpoint in endpoints ]
85+
86+ [await f for f in tqdm .tqdm (asyncio .as_completed (tasks ), total = len (endpoints ))]
87+ ret = [t .result () for t in tasks ]
8188
8289 finally :
8390 await session .close ()
@@ -220,39 +227,3 @@ def _network_error(options, rate_limit, error=None, status=None, message=None):
220227 503 : e .OfflineError (message = formatted , attachments = {"options" : options , "rate_limit" : rate_limit }),
221228 }.get (status ,
222229 e .UnknownError (message = formatted , attachments = {"options" : options , "rate_limit" : rate_limit }))
223-
224- @staticmethod
225- def limited_as_completed (coros , limit ):
226- """
227- Run the coroutines (or futures) supplied in the
228- iterable coros, ensuring that there are at most
229- limit coroutines running at any time.
230- Return an iterator whose values, when waited for,
231- are Future instances containing the results of
232- the coroutines.
233- Results may be provided in any order, as they
234- become available.
235-
236- https://github.com/andybalaam/asyncioplus/blob/master/asyncioplus/limited_as_completed.py
237- """
238- futures = [
239- asyncio .ensure_future (c )
240- for c in islice (coros , 0 , limit )
241- ]
242-
243- async def first_to_finish ():
244- while True :
245- await asyncio .sleep (0 )
246- for f in futures :
247- if f .done ():
248- futures .remove (f )
249- try :
250- newf = next (coros )
251- futures .append (
252- asyncio .ensure_future (newf ))
253- except StopIteration as e :
254- pass
255- return f .result ()
256-
257- while len (futures ) > 0 :
258- yield first_to_finish ()
0 commit comments