@@ -22,6 +22,14 @@ class RedisTestState(BaseState):
2222 count : int = 0
2323
2424
25+ class SubState1 (RedisTestState ):
26+ """A test substate for redis state manager tests."""
27+
28+
29+ class SubState2 (RedisTestState ):
30+ """A test substate for redis state manager tests."""
31+
32+
2533@pytest .fixture
2634def root_state () -> type [RedisTestState ]:
2735
@@ -65,6 +73,22 @@ def event_log(state_manager_redis: StateManagerRedis) -> list[dict[str, Any]]:
6573 return state_manager_redis .redis ._internals ["event_log" ] # pyright: ignore[reportAttributeAccessIssue]
6674
6775
76+ @pytest .fixture
77+ def event_log_on_update (state_manager_redis : StateManagerRedis ) -> asyncio .Event :
78+ """Get the event for new event records being added to the redis event log.
79+
80+ Test is responsible for calling `.clear` before an operation when it needs
81+ to detect a new event added afterward.
82+
83+ Args:
84+ state_manager_redis: The StateManagerRedis.
85+
86+ Returns:
87+ The event that is set when new events are added to the redis event log.
88+ """
89+ return state_manager_redis .redis ._internals ["event_log_on_update" ] # pyright: ignore[reportAttributeAccessIssue]
90+
91+
6892@pytest .mark .asyncio
6993async def test_basic_get_set (
7094 state_manager_redis : StateManagerRedis ,
@@ -123,13 +147,15 @@ async def test_modify_oplock(
123147 state_manager_redis : StateManagerRedis ,
124148 root_state : type [RedisTestState ],
125149 event_log : list [dict [str , Any ]],
150+ event_log_on_update : asyncio .Event ,
126151):
127152 """Test modifying state with StateManagerRedis with optimistic locking.
128153
129154 Args:
130155 state_manager_redis: The StateManagerRedis to test.
131156 root_state: The root state class.
132157 event_log: The redis event log.
158+ event_log_on_update: The event for new event records being added to the redis event log.
133159 """
134160 token = str (uuid .uuid4 ())
135161
@@ -143,6 +169,8 @@ async def test_modify_oplock(
143169 state_manager_2 ._debug_enabled = True
144170 state_manager_2 ._oplock_enabled = True
145171
172+ event_log_on_update .clear ()
173+
146174 # Initial modify should set count to 1
147175 async with state_manager_redis .modify_state (
148176 _substate_key (token , root_state ),
@@ -159,6 +187,7 @@ async def test_modify_oplock(
159187 assert state_lock_1 is not None
160188 assert not state_lock_1 .locked ()
161189
190+ await event_log_on_update .wait ()
162191 lock_events_before = len ([
163192 ev
164193 for ev in event_log
@@ -182,6 +211,7 @@ async def test_modify_oplock(
182211 assert lock_events_before == lock_events_after
183212
184213 # Contend the lock from another state manager
214+ event_log_on_update .clear ()
185215 async with state_manager_2 .modify_state (
186216 _substate_key (token , root_state ),
187217 ) as new_state :
@@ -203,6 +233,7 @@ async def test_modify_oplock(
203233 assert token not in state_manager_redis ._cached_states
204234
205235 # There should have been another redis lock taken.
236+ await event_log_on_update .wait ()
206237 lock_events_after_2 = len ([
207238 ev
208239 for ev in event_log
@@ -228,7 +259,9 @@ async def test_modify_oplock(
228259 assert token_set_events == 1
229260
230261 # Now close the contender to release its lease.
262+ event_log_on_update .clear ()
231263 await state_manager_2 .close ()
264+ await event_log_on_update .wait ()
232265
233266 # Both locks should have been released.
234267 unlock_events = len ([
@@ -562,13 +595,6 @@ async def test_oplock_fetch_substate(
562595 root_state: The root state class.
563596 event_log: The redis event log.
564597 """
565-
566- class SubState1 (root_state ):
567- pass
568-
569- class SubState2 (root_state ):
570- pass
571-
572598 token = str (uuid .uuid4 ())
573599
574600 state_manager_redis ._debug_enabled = True
@@ -627,6 +653,7 @@ async def test_oplock_hold_oplock_after_cancel(
627653 state_manager_redis : StateManagerRedis ,
628654 root_state : type [RedisTestState ],
629655 event_log : list [dict [str , Any ]],
656+ event_log_on_update : asyncio .Event ,
630657 short_lock_expiration : int ,
631658):
632659 """Test that cancelling a modify does not release the oplock prematurely.
@@ -635,6 +662,7 @@ async def test_oplock_hold_oplock_after_cancel(
635662 state_manager_redis: The StateManagerRedis to test.
636663 root_state: The root state class.
637664 event_log: The redis event log.
665+ event_log_on_update: The event log update event.
638666 short_lock_expiration: The lock expiration time in milliseconds.
639667 """
640668 token = str (uuid .uuid4 ())
@@ -683,13 +711,15 @@ async def modify():
683711 await lease_task
684712
685713 # Modify the state again, this should get a new lock and lease
714+ event_log_on_update .clear ()
686715 async with state_manager_redis .modify_state (
687716 _substate_key (token , root_state ),
688717 ) as new_state :
689718 assert isinstance (new_state , root_state )
690719 new_state .count += 1
691720
692721 # There should have been two redis lock acquisitions.
722+ await event_log_on_update .wait ()
693723 lock_events = len ([
694724 ev
695725 for ev in event_log
0 commit comments