diff --git a/pyproject.toml b/pyproject.toml index e4ada4172c6..97ecabd4113 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ dependencies = [ "typing_extensions >=4.13.0", "wrapt >=1.17.0,<2.0", ] + classifiers = [ "Development Status :: 4 - Beta", "License :: OSI Approved :: Apache Software License", @@ -49,6 +50,10 @@ classifiers = [ "Programming Language :: Python :: 3.13", ] +[project.optional-dependencies] +monitoring = [ + "pyleak >=0.1.14,<1.0", +] [project.urls] homepage = "https://reflex.dev" @@ -74,6 +79,7 @@ dev = [ "pre-commit", "psutil", "psycopg[binary]", + "pyleak >=0.1.14,<1.0", "pyright", "pytest-asyncio", "pytest-benchmark", diff --git a/reflex/config.py b/reflex/config.py index b4970c7515e..8b961a6b88b 100644 --- a/reflex/config.py +++ b/reflex/config.py @@ -29,6 +29,9 @@ from reflex.utils import console from reflex.utils.exceptions import ConfigError +if TYPE_CHECKING: + from pyleak.base import LeakAction + @dataclasses.dataclass(kw_only=True) class DBConfig: @@ -186,6 +189,18 @@ class BaseConfig: # Telemetry opt-in. telemetry_enabled: bool = True + # PyLeak monitoring configuration for detecting event loop blocking and resource leaks. + enable_pyleak_monitoring: bool = False + + # Threshold in seconds for detecting event loop blocking operations. + pyleak_blocking_threshold: float = 0.1 + + # Grace period in seconds for thread leak detection cleanup. + pyleak_thread_grace_period: float = 0.2 + + # Action to take when PyLeak detects issues + pyleak_action: "LeakAction | None" = None + # The bun path bun_path: ExistingPath = constants.Bun.DEFAULT_PATH diff --git a/reflex/state.py b/reflex/state.py index 692f71cb6ca..a980933df08 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -60,6 +60,7 @@ ) from reflex.utils.exceptions import ImmutableStateError as ImmutableStateError from reflex.utils.exec import is_testing_env +from reflex.utils.monitoring import is_pyleak_enabled, monitor_loopblocks from reflex.utils.types import _isinstance, is_union, value_inside_optional from reflex.vars import Field, VarData, field from reflex.vars.base import ( @@ -1784,7 +1785,11 @@ async def _process_event( from reflex.utils import telemetry # Get the function to process the event. - fn = functools.partial(handler.fn, state) + if is_pyleak_enabled(): + console.debug(f"Monitoring leaks for handler: {handler.fn.__qualname__}") + fn = functools.partial(monitor_loopblocks(handler.fn), state) + else: + fn = functools.partial(handler.fn, state) try: type_hints = typing.get_type_hints(handler.fn) diff --git a/reflex/utils/monitoring.py b/reflex/utils/monitoring.py new file mode 100644 index 00000000000..7b3ce88c764 --- /dev/null +++ b/reflex/utils/monitoring.py @@ -0,0 +1,180 @@ +"""PyLeak integration for monitoring event loop blocking and resource leaks in Reflex applications.""" + +import asyncio +import contextlib +import functools +import inspect +import threading +from collections.abc import AsyncGenerator, Awaitable, Callable, Generator +from typing import TypeVar, overload + +from reflex.config import get_config + +try: + from pyleak import no_event_loop_blocking, no_task_leaks, no_thread_leaks + from pyleak.base import LeakAction + + PYLEAK_AVAILABLE = True +except ImportError: + PYLEAK_AVAILABLE = False + no_event_loop_blocking = no_task_leaks = no_thread_leaks = None # pyright: ignore[reportAssignmentType] + LeakAction = None # pyright: ignore[reportAssignmentType] + + +# Thread-local storage to track if monitoring is already active +_thread_local = threading.local() + + +def is_pyleak_enabled() -> bool: + """Check if PyLeak monitoring is enabled and available. + + Returns: + True if PyLeak monitoring is enabled in config and PyLeak is available. + """ + if not PYLEAK_AVAILABLE: + return False + config = get_config() + return config.enable_pyleak_monitoring + + +@contextlib.contextmanager +def monitor_sync(): + """Sync context manager for PyLeak monitoring. + + Yields: + None: Context for monitoring sync operations. + """ + if not is_pyleak_enabled(): + yield + return + + # Check if monitoring is already active in this thread + if getattr(_thread_local, "monitoring_active", False): + yield + return + + config = get_config() + action = config.pyleak_action or LeakAction.WARN # pyright: ignore[reportOptionalMemberAccess] + + # Mark monitoring as active + _thread_local.monitoring_active = True + try: + with contextlib.ExitStack() as stack: + # Thread leak detection has issues with background tasks (no_thread_leaks) + stack.enter_context( + no_event_loop_blocking( # pyright: ignore[reportOptionalCall] + action=action, + threshold=config.pyleak_blocking_threshold, + ) + ) + yield + finally: + _thread_local.monitoring_active = False + + +@contextlib.asynccontextmanager +async def monitor_async(): + """Async context manager for PyLeak monitoring. + + Yields: + None: Context for monitoring async operations. + """ + if not is_pyleak_enabled(): + yield + return + + # Check if monitoring is already active in this thread + if getattr(_thread_local, "monitoring_active", False): + yield + return + + config = get_config() + action = config.pyleak_action or LeakAction.WARN # pyright: ignore[reportOptionalMemberAccess] + + # Mark monitoring as active + _thread_local.monitoring_active = True + try: + async with contextlib.AsyncExitStack() as stack: + # Thread leak detection has issues with background tasks (no_thread_leaks) + # Re-add thread leak later. + + # Block detection for event loops + stack.enter_context( + no_event_loop_blocking( # pyright: ignore[reportOptionalCall] + action=action, + threshold=config.pyleak_blocking_threshold, + ) + ) + # Task leak detection has issues with background tasks (no_task_leaks) + + yield + finally: + _thread_local.monitoring_active = False + + +YieldType = TypeVar("YieldType") +SendType = TypeVar("SendType") +ReturnType = TypeVar("ReturnType") + + +@overload +def monitor_loopblocks( + func: Callable[..., AsyncGenerator[YieldType, ReturnType]], +) -> Callable[..., AsyncGenerator[YieldType, ReturnType]]: ... + + +@overload +def monitor_loopblocks( + func: Callable[..., Generator[YieldType, SendType, ReturnType]], +) -> Callable[..., Generator[YieldType, SendType, ReturnType]]: ... + + +@overload +def monitor_loopblocks( + func: Callable[..., Awaitable[ReturnType]], +) -> Callable[..., Awaitable[ReturnType]]: ... + + +def monitor_loopblocks(func: Callable) -> Callable: + """Framework decorator using the monitoring module's context manager. + + Args: + func: The function to be monitored for leaks. + + Returns: + Decorator function that applies PyLeak monitoring to sync/async functions. + """ + if inspect.isasyncgenfunction(func): + + @functools.wraps(func) + async def async_gen_wrapper(*args, **kwargs): + async with monitor_async(): + async for item in func(*args, **kwargs): + yield item + + return async_gen_wrapper + + if asyncio.iscoroutinefunction(func): + + @functools.wraps(func) + async def async_wrapper(*args, **kwargs): + async with monitor_async(): + return await func(*args, **kwargs) + + return async_wrapper + + if inspect.isgeneratorfunction(func): + + @functools.wraps(func) + def gen_wrapper(*args, **kwargs): + with monitor_sync(): + yield from func(*args, **kwargs) + + return gen_wrapper + + @functools.wraps(func) + def sync_wrapper(*args, **kwargs): + with monitor_sync(): + return func(*args, **kwargs) + + return sync_wrapper # pyright: ignore[reportReturnType]