Skip to content

Commit 4eaf1c3

Browse files
committed
bounded wait for lock waiter's psubscribe to start
1 parent fefa026 commit 4eaf1c3

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

reflex/istate/manager/redis.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ def _default_oplock_hold_time_ms() -> int:
6161
)
6262

6363

64+
# The lock waiter task should subscribe to lock channel updates within this period.
65+
LOCK_SUBSCRIBE_TASK_TIMEOUT = 2 # seconds
66+
67+
6468
SMR = f"[SMR:{os.getpid()}]"
6569
start = time.monotonic()
6670

@@ -980,7 +984,12 @@ async def _wait_lock(self, lock_key: bytes, lock_id: bytes) -> None:
980984
# Make sure lock waiter task is running.
981985
self._ensure_lock_task()
982986
# Make sure the lock waiter is subscribed to avoid missing notifications.
983-
await self._lock_updates_subscribed.wait()
987+
await asyncio.wait_for(
988+
self._lock_updates_subscribed.wait(),
989+
timeout=min(
990+
LOCK_SUBSCRIBE_TASK_TIMEOUT, max(self.lock_expiration / 1000, 0)
991+
),
992+
)
984993
async with (
985994
self._lock_waiter(lock_key) as lock_released_event,
986995
self._request_lock_release(lock_key, lock_id),

0 commit comments

Comments
 (0)