Conversation
When an update is emitted for a token, but the websocket for that token is on another instance of the app, post it to the lost+found channel where other instances are listening for updates to send to their clients.
Set the groundwork for being able to broadcast updates to all connected states.
For more efficient and fair lock queueing, each StateManagerRedis uses a single task to monitor the keyspace for lock release/expire and then wakes up the next caller that was waiting in the queue (no fairness between separate processes though). Now lockers will wait for an `asyncio.Event` which is set by the redis pubsub waiter. If any locker waits longer than the lock_expiration, it will just try to get the lock in case there was some mixup with the pub/sub, the locker won't be blocked forever.
* When taking a lock from redis, hold it for 80% of the lock expiration timeout * While the lock is held, other events processed against the instance will use the cached in-memory copy of the state. * When the timeout expires or another process signals intention to access a locked state, flush the modifed states to redis and release the lock. Set REFLEX_OPLOCK_ENABLED=1 to use this feature
CodSpeed Performance ReportMerging #5932 will not alter performanceComparing Summary
|
This was referenced Oct 27, 2025
…senf/redis_oplock
Always check redis for contended leases before granting a lease. It's a bit slower, but much more reliable and avoids racy lock_expiration timeouts when contention occurs before the lease is created or when the pubsub hasn't caught up to reality. Always start _lock_update_task in __post_init__ to avoid race where the lease is granted, then contended, but the pubsub task hasn't started to catch the contention.
Contributor
There was a problem hiding this comment.
Greptile Overview
Greptile Summary
This PR implements an opportunistic locking (oplock) mechanism for the Redis state manager to improve performance when there's no lock contention. The key change is that when REFLEX_OPLOCK_ENABLED=1 is set, the state manager holds Redis locks for 80% of the expiration timeout and caches states in memory, allowing subsequent operations to avoid Redis round-trips entirely.
Key Changes:
- Holds locks for 80% of
lock_expirationtime to enable fast in-memory state access during uncontended periods - Uses Redis pubsub keyspace notifications to detect lock contention and flush cached states when other processes need access
- Adds comprehensive test coverage including mock Redis implementation and integration tests
- Gracefully handles cancellation and ensures state consistency by shielding flush operations from cancellation
- Maintains backward compatibility - feature is opt-in via environment variable
Implementation Quality:
- Well-structured with extensive test coverage (8 new test cases covering basic ops, contention, cancellation, and edge cases)
- Proper error handling and cleanup logic with
asyncio.shieldto prevent data loss - CI updated to test both with and without oplock enabled
- Good separation of concerns with
_try_modify_statehandling retry logic
Confidence Score: 4/5
- This PR is generally safe to merge with some considerations for production deployment
- Score of 4 reflects solid implementation with comprehensive testing, but this is a complex concurrency feature that introduces new failure modes. The oplock mechanism is well-tested and properly handles contention, cancellation, and edge cases. However, it's a significant architectural change to the state management system that could have unexpected interactions in production under high load or network issues. The feature is appropriately gated behind an environment variable and thoroughly tested in CI. One minor style suggestion was provided for code clarity.
- Pay close attention to
reflex/istate/manager/redis.py- this is the core implementation with complex async logic and lock management
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| reflex/istate/manager/redis.py | 4/5 | Implements opportunistic locking (oplock) for Redis state manager. Holds locks for 80% of expiration time to enable fast in-memory access when uncontended. Adds pubsub-based lock contention detection and local state caching. |
| reflex/environment.py | 5/5 | Adds two environment variables: REFLEX_REDIS_STATE_MANAGER_DEBUG for debug logging and REFLEX_OPLOCK_ENABLED to enable opportunistic locking feature. |
| tests/units/mock_redis.py | 5/5 | New mock Redis implementation supporting pubsub, keyspace notifications, and set operations. Also provides real_redis helper for integration testing. |
| tests/units/istate/manager/test_redis.py | 5/5 | Comprehensive test coverage for Redis state manager oplock feature. Tests basic operations, lock contention, lease management, cancellation handling, and substate fetching. |
| .github/workflows/unit_tests.yml | 5/5 | Adds Redis service container and additional test run with REFLEX_OPLOCK_ENABLED=true to verify oplock functionality in CI. |
Sequence Diagram
sequenceDiagram
participant Client1 as Client 1 (Process A)
participant SM1 as StateManager 1
participant Redis
participant PubSub as Redis PubSub
participant SM2 as StateManager 2
participant Client2 as Client 2 (Process B)
Note over Client1,Client2: Scenario: Oplock enabled, uncontended access
Client1->>SM1: modify_state(token)
SM1->>Redis: SET token_lock (NX, PX=lock_expiration)
Redis-->>SM1: OK (lock acquired)
SM1->>Redis: GET state data
Redis-->>SM1: state
SM1->>SM1: Cache state in _cached_states
SM1->>SM1: Create lease_breaker task (sleep 80% of lock_expiration)
SM1-->>Client1: yield cached_state
Client1->>Client1: Modify state in memory
Client1->>SM1: Exit context (fast, no Redis write)
Note over SM1: Lock held, state cached for subsequent calls
Client1->>SM1: modify_state(token) [2nd call]
SM1->>SM1: Check _cached_states
SM1-->>Client1: yield cached_state (no Redis lock!)
Client1->>Client1: Modify state in memory
Client1->>SM1: Exit context (fast, no Redis write)
Note over Client2,SM2: Contention scenario begins
Client2->>SM2: modify_state(token)
SM2->>Redis: SADD token_lock_waiters (signal contention)
Redis->>PubSub: keyspace event: sadd
PubSub->>SM1: Lock contention detected
SM1->>SM1: Cancel lease_breaker task
SM1->>Redis: SET state data (flush cached state)
SM1->>Redis: DEL token_lock (release)
Redis->>PubSub: keyspace event: del
PubSub->>SM2: Lock released notification
SM2->>Redis: SET token_lock (acquire)
Redis-->>SM2: OK
SM2->>Redis: GET state data
Redis-->>SM2: state (with Client1's changes)
SM2->>SM2: Cache state, create new lease_breaker
SM2-->>Client2: yield state
Client2->>Client2: Modify state
Client2->>SM2: Exit context
Note over SM1,SM2: Lease timeout scenario (no contention)
SM1->>SM1: lease_breaker wakes after 80% timeout
SM1->>Redis: SET state data (flush)
SM1->>Redis: DEL token_lock (release)
SM1->>SM1: Clear _cached_states[token]
10 files reviewed, 1 comment
No point in continually spamming "no running event loop" to the console.
adhami3310
approved these changes
Nov 4, 2025
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
the cached in-memory copy of the state.
locked state, flush the modifed states to redis and release the lock.
Set REFLEX_OPLOCK_ENABLED=1 to use this feature
Increases overall test coverage by ~1% without REFLEX_REDIS_URL