Skip to content
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies = [
"platformdirs >=4.3.7,<5.0",
"psutil >=7.0.0,<8.0; sys_platform == 'win32'",
"pydantic >=1.10.21,<3.0",
"pyleak >=0.1.14,<1.0",
Comment thread
Lendemor marked this conversation as resolved.
Outdated
"python-multipart >=0.0.20,<1.0",
"python-socketio >=5.12.0,<6.0",
"redis >=5.2.1,<7.0",
Expand Down
15 changes: 15 additions & 0 deletions reflex/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
from reflex.utils import console
from reflex.utils.exceptions import ConfigError

if TYPE_CHECKING:
from pyleak.base import LeakAction


@dataclasses.dataclass(kw_only=True)
class DBConfig:
Expand Down Expand Up @@ -186,6 +189,18 @@ class BaseConfig:
# Telemetry opt-in.
telemetry_enabled: bool = True

# PyLeak monitoring configuration for detecting event loop blocking and resource leaks.
enable_pyleak_monitoring: bool = False

# Threshold in seconds for detecting event loop blocking operations.
pyleak_blocking_threshold: float = 0.001

# Grace period in seconds for thread leak detection cleanup.
pyleak_thread_grace_period: float = 0.2

# Action to take when PyLeak detects issues
pyleak_action: "LeakAction | None" = None

# The bun path
bun_path: ExistingPath = constants.Bun.DEFAULT_PATH

Expand Down
15 changes: 15 additions & 0 deletions reflex/monitoring/__init__.py
Comment thread
Lendemor marked this conversation as resolved.
Outdated
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Monitoring utilities for Reflex applications."""

from reflex.monitoring.pyleak import (
is_pyleak_enabled,
monitor_async,
monitor_leaks,
monitor_sync,
)

__all__ = [
"is_pyleak_enabled",
"monitor_async",
"monitor_leaks",
"monitor_sync",
]
109 changes: 109 additions & 0 deletions reflex/monitoring/pyleak.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""PyLeak integration for monitoring event loop blocking and resource leaks in Reflex applications."""

import contextlib
from collections.abc import Callable

from pyleak import no_event_loop_blocking, no_task_leaks, no_thread_leaks
from pyleak.base import LeakAction

from reflex.config import get_config


def is_pyleak_enabled() -> bool:
"""Check if PyLeak monitoring is enabled and available.

Returns:
True if PyLeak monitoring is enabled in config.
"""
config = get_config()
return config.enable_pyleak_monitoring


@contextlib.contextmanager
def monitor_sync():
"""Sync context manager for PyLeak monitoring.

Yields:
None: Context for monitoring sync operations.
"""
if not is_pyleak_enabled():
yield
return

config = get_config()
action = config.pyleak_action or LeakAction.WARN

with contextlib.ExitStack() as stack:
stack.enter_context(
no_thread_leaks(
action=action,
grace_period=config.pyleak_thread_grace_period,
)
)
stack.enter_context(
no_event_loop_blocking(
action=action,
threshold=config.pyleak_blocking_threshold,
)
)
yield


@contextlib.asynccontextmanager
async def monitor_async():
"""Async context manager for PyLeak monitoring.

Yields:
None: Context for monitoring async operations.
"""
if not is_pyleak_enabled():
yield
return

config = get_config()
action = config.pyleak_action or LeakAction.WARN

async with contextlib.AsyncExitStack() as stack:
stack.enter_context(
no_thread_leaks(
action=action,
grace_period=config.pyleak_thread_grace_period,
)
)
stack.enter_context(
no_event_loop_blocking(
action=action,
threshold=config.pyleak_blocking_threshold,
)
)
await stack.enter_async_context(no_task_leaks(action=action))
yield


def monitor_leaks():
"""Framework decorator using the monitoring module's context manager.

Returns:
Decorator function that applies PyLeak monitoring to sync/async functions.
"""
import asyncio
import functools
Comment thread
Lendemor marked this conversation as resolved.
Outdated

def decorator(func: Callable):
if asyncio.iscoroutinefunction(func):

@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
async with monitor_async():
return await func(*args, **kwargs)

return async_wrapper

@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
with monitor_sync():
return func(*args, **kwargs)

return sync_wrapper # pyright: ignore[reportReturnType]

return decorator
9 changes: 7 additions & 2 deletions reflex/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from reflex.istate.proxy import MutableProxy, StateProxy
from reflex.istate.storage import ClientStorageBase
from reflex.model import Model
from reflex.monitoring import is_pyleak_enabled, monitor_leaks
from reflex.utils import console, format, prerequisites, types
from reflex.utils.exceptions import (
ComputedVarShadowsBaseVarsError,
Expand Down Expand Up @@ -1784,7 +1785,11 @@ async def _process_event(
from reflex.utils import telemetry

# Get the function to process the event.
fn = functools.partial(handler.fn, state)
if is_pyleak_enabled():
console.debug(f"Monitoring leaks for handler: {handler.fn.__qualname__}")
fn = functools.partial(monitor_leaks()(handler.fn), state)
else:
fn = functools.partial(handler.fn, state)

try:
type_hints = typing.get_type_hints(handler.fn)
Expand Down Expand Up @@ -1869,7 +1874,7 @@ async def _process_event(

# Handle regular event chains.
else:
yield await state._as_state_update(handler, events, final=True)
yield await state._as_state_update(handler, events, final=True) # pyright: ignore[reportArgumentType]

# If an error occurs, throw a window alert.
except Exception as ex:
Expand Down