3636from collections .abc import Container , Mapping
3737from struct import Struct
3838
39- from aiorpcx import TaskGroup
39+ import aiorpcx
4040
4141
4242# Use system-compiled JSON lib if available, fallback to stdlib
@@ -359,7 +359,7 @@ def pack_varbytes(data):
359359 return pack_varint (len (data )) + data
360360
361361
362- class OldTaskGroup (TaskGroup ):
362+ class OldTaskGroup (aiorpcx . TaskGroup ):
363363 """Automatically raises exceptions on join; as in aiorpcx prior to version 0.20"""
364364 async def join (self ):
365365 if self ._wait is all :
@@ -379,3 +379,35 @@ async def join(self):
379379 await super ().join ()
380380 if self .completed :
381381 self .completed .result ()
382+
383+
384+ # We monkey-patch aiorpcx TimeoutAfter (used by timeout_after and ignore_after API),
385+ # to fix a timing issue present in asyncio as a whole re timing out tasks.
386+ # To see the issue we are trying to fix, consider example:
387+ # async def outer_task():
388+ # async with timeout_after(0.1):
389+ # await inner_task()
390+ # When the 0.1 sec timeout expires, inner_task will get cancelled by timeout_after (=internal cancellation).
391+ # If around the same time (in terms of event loop iterations) another coroutine
392+ # cancels outer_task (=external cancellation), there will be a race.
393+ # Both cancellations work by propagating a CancelledError out to timeout_after, which then
394+ # needs to decide (in TimeoutAfter.__aexit__) whether it's due to an internal or external cancellation.
395+ # AFAICT asyncio provides no reliable way of distinguishing between the two.
396+ # This patch tries to always give priority to external cancellations.
397+ # see https://github.com/kyuupichan/aiorpcX/issues/44
398+ # see https://github.com/aio-libs/async-timeout/issues/229
399+ # see https://bugs.python.org/issue42130 and https://bugs.python.org/issue45098
400+ def _aiorpcx_monkeypatched_set_new_deadline (task , deadline ):
401+ def timeout_task ():
402+ task ._orig_cancel ()
403+ task ._timed_out = None if getattr (task , "_externally_cancelled" , False ) else deadline
404+ def mycancel (* args , ** kwargs ):
405+ task ._orig_cancel (* args , ** kwargs )
406+ task ._externally_cancelled = True
407+ task ._timed_out = None
408+ if not hasattr (task , "_orig_cancel" ):
409+ task ._orig_cancel = task .cancel
410+ task .cancel = mycancel
411+ task ._deadline_handle = task ._loop .call_at (deadline , timeout_task )
412+
413+ aiorpcx .curio ._set_new_deadline = _aiorpcx_monkeypatched_set_new_deadline
0 commit comments