Skip to content
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
"typing_extensions >=4.13.0",
"wrapt >=1.17.0,<2.0",
]

classifiers = [
"Development Status :: 4 - Beta",
"License :: OSI Approved :: Apache Software License",
Expand All @@ -49,6 +50,10 @@ classifiers = [
"Programming Language :: Python :: 3.13",
]

[project.optional-dependencies]
monitoring = [
"pyleak >=0.1.14,<1.0",
]

[project.urls]
homepage = "https://reflex.dev"
Expand All @@ -74,6 +79,7 @@ dev = [
"pre-commit",
"psutil",
"psycopg[binary]",
"pyleak >=0.1.14,<1.0",
"pyright",
"pytest-asyncio",
"pytest-benchmark",
Expand Down
15 changes: 15 additions & 0 deletions reflex/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
from reflex.utils import console
from reflex.utils.exceptions import ConfigError

if TYPE_CHECKING:
from pyleak.base import LeakAction


@dataclasses.dataclass(kw_only=True)
class DBConfig:
Expand Down Expand Up @@ -186,6 +189,18 @@ class BaseConfig:
# Telemetry opt-in.
telemetry_enabled: bool = True

# PyLeak monitoring configuration for detecting event loop blocking and resource leaks.
enable_pyleak_monitoring: bool = False

# Threshold in seconds for detecting event loop blocking operations.
pyleak_blocking_threshold: float = 0.1

# Grace period in seconds for thread leak detection cleanup.
pyleak_thread_grace_period: float = 0.2

# Action to take when PyLeak detects issues
pyleak_action: "LeakAction | None" = None

# The bun path
bun_path: ExistingPath = constants.Bun.DEFAULT_PATH

Expand Down
7 changes: 6 additions & 1 deletion reflex/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
)
from reflex.utils.exceptions import ImmutableStateError as ImmutableStateError
from reflex.utils.exec import is_testing_env
from reflex.utils.monitoring import is_pyleak_enabled, monitor_loopblocks
from reflex.utils.types import _isinstance, is_union, value_inside_optional
from reflex.vars import Field, VarData, field
from reflex.vars.base import (
Expand Down Expand Up @@ -1784,7 +1785,11 @@ async def _process_event(
from reflex.utils import telemetry

# Get the function to process the event.
fn = functools.partial(handler.fn, state)
if is_pyleak_enabled():
console.debug(f"Monitoring leaks for handler: {handler.fn.__qualname__}")
fn = functools.partial(monitor_loopblocks(handler.fn), state)
else:
fn = functools.partial(handler.fn, state)

try:
type_hints = typing.get_type_hints(handler.fn)
Expand Down
180 changes: 180 additions & 0 deletions reflex/utils/monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""PyLeak integration for monitoring event loop blocking and resource leaks in Reflex applications."""

import asyncio
import contextlib
import functools
import inspect
import threading
from collections.abc import AsyncGenerator, Awaitable, Callable, Generator
from typing import TypeVar, overload

from reflex.config import get_config

try:
from pyleak import no_event_loop_blocking, no_task_leaks, no_thread_leaks
from pyleak.base import LeakAction

PYLEAK_AVAILABLE = True
except ImportError:
PYLEAK_AVAILABLE = False
no_event_loop_blocking = no_task_leaks = no_thread_leaks = None # pyright: ignore[reportAssignmentType]
LeakAction = None # pyright: ignore[reportAssignmentType]


# Thread-local storage to track if monitoring is already active
_thread_local = threading.local()


def is_pyleak_enabled() -> bool:
"""Check if PyLeak monitoring is enabled and available.

Returns:
True if PyLeak monitoring is enabled in config and PyLeak is available.
"""
if not PYLEAK_AVAILABLE:
return False
config = get_config()
return config.enable_pyleak_monitoring


@contextlib.contextmanager
def monitor_sync():
"""Sync context manager for PyLeak monitoring.

Yields:
None: Context for monitoring sync operations.
"""
if not is_pyleak_enabled():
yield
return

# Check if monitoring is already active in this thread
if getattr(_thread_local, "monitoring_active", False):
yield
return

config = get_config()
action = config.pyleak_action or LeakAction.WARN # pyright: ignore[reportOptionalMemberAccess]

# Mark monitoring as active
_thread_local.monitoring_active = True
try:
with contextlib.ExitStack() as stack:
# Thread leak detection has issues with background tasks (no_thread_leaks)
stack.enter_context(
no_event_loop_blocking( # pyright: ignore[reportOptionalCall]
action=action,
threshold=config.pyleak_blocking_threshold,
)
)
yield
finally:
_thread_local.monitoring_active = False


@contextlib.asynccontextmanager
async def monitor_async():
"""Async context manager for PyLeak monitoring.

Yields:
None: Context for monitoring async operations.
"""
if not is_pyleak_enabled():
yield
return

# Check if monitoring is already active in this thread
if getattr(_thread_local, "monitoring_active", False):
yield
return

config = get_config()
action = config.pyleak_action or LeakAction.WARN # pyright: ignore[reportOptionalMemberAccess]

# Mark monitoring as active
_thread_local.monitoring_active = True
try:
async with contextlib.AsyncExitStack() as stack:
# Thread leak detection has issues with background tasks (no_thread_leaks)
# Re-add thread leak later.

# Block detection for event loops
stack.enter_context(
no_event_loop_blocking( # pyright: ignore[reportOptionalCall]
action=action,
threshold=config.pyleak_blocking_threshold,
)
)
# Task leak detection has issues with background tasks (no_task_leaks)

yield
finally:
_thread_local.monitoring_active = False


YieldType = TypeVar("YieldType")
SendType = TypeVar("SendType")
ReturnType = TypeVar("ReturnType")


@overload
def monitor_loopblocks(
func: Callable[..., AsyncGenerator[YieldType, ReturnType]],
) -> Callable[..., AsyncGenerator[YieldType, ReturnType]]: ...


@overload
def monitor_loopblocks(
func: Callable[..., Generator[YieldType, SendType, ReturnType]],
) -> Callable[..., Generator[YieldType, SendType, ReturnType]]: ...


@overload
def monitor_loopblocks(
func: Callable[..., Awaitable[ReturnType]],
) -> Callable[..., Awaitable[ReturnType]]: ...


def monitor_loopblocks(func: Callable) -> Callable:
"""Framework decorator using the monitoring module's context manager.

Args:
func: The function to be monitored for leaks.

Returns:
Decorator function that applies PyLeak monitoring to sync/async functions.
"""
if inspect.isasyncgenfunction(func):

@functools.wraps(func)
async def async_gen_wrapper(*args, **kwargs):
async with monitor_async():
async for item in func(*args, **kwargs):
yield item

return async_gen_wrapper

if asyncio.iscoroutinefunction(func):

@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
async with monitor_async():
return await func(*args, **kwargs)

return async_wrapper

if inspect.isgeneratorfunction(func):

@functools.wraps(func)
def gen_wrapper(*args, **kwargs):
with monitor_sync():
yield from func(*args, **kwargs)

return gen_wrapper

@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
with monitor_sync():
return func(*args, **kwargs)

return sync_wrapper # pyright: ignore[reportReturnType]
Loading