Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions reflex/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,9 @@ class EnvironmentVariables:
# Whether to opportunistically hold the redis lock to allow fast in-memory access while uncontended.
REFLEX_OPLOCK_ENABLED: EnvVar[bool] = env_var(False)

# How long to opportunistically hold the redis lock in milliseconds (must be less than the token expiration).
REFLEX_OPLOCK_HOLD_TIME_MS: EnvVar[int] = env_var(0)


environment = EnvironmentVariables()

Expand Down
21 changes: 20 additions & 1 deletion reflex/istate/manager/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ def _default_lock_warning_threshold() -> int:
return get_config().redis_lock_warning_threshold


def _default_oplock_hold_time_ms() -> int:
"""Get the default opportunistic lock hold time.

Returns:
The default opportunistic lock hold time.
"""
return environment.REFLEX_OPLOCK_HOLD_TIME_MS.get() or (
_default_lock_expiration() // 2
)
Comment on lines +59 to +61
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Add a comment explaining the time calculation in human-readable terms, e.g., "# Default to 50% of lock expiration (e.g., 5000ms for 10000ms expiration)"

Suggested change
return environment.REFLEX_OPLOCK_HOLD_TIME_MS.get() or (
_default_lock_expiration() // 2
)
return environment.REFLEX_OPLOCK_HOLD_TIME_MS.get() or (
_default_lock_expiration() // 2 # Default to 50% of lock expiration (e.g., 5000ms for 10000ms expiration)
)

Context Used: Rule from dashboard - When using time-based calculations in code, include comments that explain the time duration in human... (source)

Prompt To Fix With AI
This is a comment left during a code review.
Path: reflex/istate/manager/redis.py
Line: 59:61

Comment:
**style:** Add a comment explaining the time calculation in human-readable terms, e.g., "# Default to 50% of lock expiration (e.g., 5000ms for 10000ms expiration)"

```suggestion
    return environment.REFLEX_OPLOCK_HOLD_TIME_MS.get() or (
        _default_lock_expiration() // 2  # Default to 50% of lock expiration (e.g., 5000ms for 10000ms expiration)
    )
```

**Context Used:** Rule from `dashboard` - When using time-based calculations in code, include comments that explain the time duration in human... ([source](https://app.greptile.com/review/custom-context?memory=f12765cb-0537-4be8-81ad-061314e0e5d0))

How can I resolve this? If you propose a fix, please make it concise.



SMR = f"[SMR:{os.getpid()}]"
start = time.monotonic()

Expand Down Expand Up @@ -85,6 +96,11 @@ class StateManagerRedis(StateManager):
default_factory=_default_lock_warning_threshold
)

# How long to opportunistically hold the redis lock in milliseconds (must be less than the token expiration).
oplock_hold_time_ms: int = dataclasses.field(
default_factory=_default_oplock_hold_time_ms
)

# The keyspace subscription string when redis is waiting for lock to be released.
_redis_notify_keyspace_events: str = dataclasses.field(
default="K" # Enable keyspace notifications (target a particular key)
Expand Down Expand Up @@ -154,6 +170,9 @@ def __post_init__(self):
if self.lock_warning_threshold >= (lock_expiration := self.lock_expiration):
msg = f"The lock warning threshold({self.lock_warning_threshold}) must be less than the lock expiration time({lock_expiration})."
raise InvalidLockWarningThresholdError(msg)
if self._oplock_enabled and self.oplock_hold_time_ms >= lock_expiration:
msg = f"The opportunistic lock hold time({self.oplock_hold_time_ms}) must be less than the lock expiration time({lock_expiration})."
raise InvalidLockWarningThresholdError(msg)
with contextlib.suppress(RuntimeError):
asyncio.get_running_loop() # Check if we're in an event loop.
self._ensure_lock_task()
Expand Down Expand Up @@ -620,7 +639,7 @@ async def do_flush() -> None:
async def lease_breaker():
cancelled_error: asyncio.CancelledError | None = None
async with cleanup_ctx:
lease_break_time = (self.lock_expiration * 0.8) / 1000
lease_break_time = self.oplock_hold_time_ms / 1000
if self._debug_enabled:
console.debug(
f"{SMR} [{time.monotonic() - start:.3f}] {client_token} lease breaker {lock_id.decode()} started, sleeping for {lease_break_time}s"
Expand Down
Loading