From 67fdff403ef896633b1e431e08b766adadcf07cc Mon Sep 17 00:00:00 2001 From: waiho-gumloop Date: Mon, 29 Jun 2026 14:38:16 -0700 Subject: [PATCH 1/4] [GMLP-9206] Backport redis-py PR #3557 recursion fix Workaround for the on_connect-recursion bug in redis-py < 6.0.0b2 that crashed ~261 Celery workers across all services during the production Redis broker pod replacement on 2026-06-25 (incident GMLP-9206). Root cause: when a redis-py Connection is mid-handshake and the inner PING from check_health fails, send_packed_command calls self.connect() to recover, which re-enters on_connect -> CLIENT SETINFO -> check_health -> PING -> send_packed_command -> connect(), recursing until RecursionError crashes the worker. The upstream fix (redis-py PR #3557) splits connect into connect_check_health and threads check_health=False through the inner reconnect. It is in redis-py 6.0.0b2+ only and is NOT backported to the 5.x line. We cannot adopt the upstream fix without a coordinated upgrade: - kombu 5.5.x pins redis<=5.2.1 - celery 5.5.x pins kombu<5.6 - the gumloop-celery fork would need to be rebased on celery 5.6+ (which also requires forward-porting rbehal/celery's spawn pool work and our --disable-prefetch / GMLP-9012 PRs) Until that upgrade lands, this patches Connection.send_packed_command to raise ConnectionError instead of silently calling self.connect() when the socket is gone. Higher-level callers (kombu Channel, redis.client, application pools) already handle ConnectionError with proper retry, so the failure surfaces cleanly. Patch is applied via import side effect from celery/__init__.py and is a no-op when: - redis-py is not installed - redis-py >= 6.0.0b2 (detected via hasattr Connection, 'connect_check_health') References: - redis-py PR #3557: https://github.com/redis/redis-py/pull/3557 - redis-py issue #3745: https://github.com/redis/redis-py/issues/3745 - Linear GMLP-9206 --- celery/__init__.py | 4 ++ celery/_redis_py_recursion_workaround.py | 89 ++++++++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 celery/_redis_py_recursion_workaround.py diff --git a/celery/__init__.py b/celery/__init__.py index 4739b81fb8..0fa6e30338 100644 --- a/celery/__init__.py +++ b/celery/__init__.py @@ -15,6 +15,10 @@ # Lazy loading from . import local +# Apply redis-py recursion workaround (GMLP-9206) before any kombu/redis-py +# Connection is instantiated. No-op for redis-py >= 6.0.0b2. +from . import _redis_py_recursion_workaround # noqa: F401 + SERIES = 'immunity' __version__ = '5.5.3+gumloop_0.1.5' diff --git a/celery/_redis_py_recursion_workaround.py b/celery/_redis_py_recursion_workaround.py new file mode 100644 index 0000000000..29e47bf519 --- /dev/null +++ b/celery/_redis_py_recursion_workaround.py @@ -0,0 +1,89 @@ +"""Backport of redis-py PR #3557 for redis-py < 6.0.0b2. + +When the broker connection drops mid-handshake (e.g. broker pod restart, +TCP RST), redis-py < 6.0.0b2 enters infinite recursion through: + + on_connect() -> send_command("CLIENT","SETINFO",...) + -> send_packed_command(check_health=True) + -> check_health() -> _send_ping() + -> send_command("PING", check_health=False) + -> send_packed_command(check_health=False) + -> if not self._sock: self.connect() + -> on_connect() ... (RecursionError at ~1000 frames) + +The fix in redis-py PR #3557 (released in v6.0.0b2, April 2025) splits +``connect`` and ``on_connect`` into ``connect_check_health`` / +``on_connect_check_health`` and threads ``check_health=False`` through +the inner reconnect path so the cycle cannot form. + +Because kombu < 5.6.0 pins ``redis<=5.2.1``, gumloop-celery users cannot +adopt the fix without a coordinated celery + kombu + redis-py upgrade. +Until that upgrade lands, this module applies the minimal localized +patch needed to break the recursion at the call site. + +The patch: + +- replaces ``Connection.send_packed_command``'s "no socket -> connect()" + branch with a clean ``ConnectionError``. Higher-level callers + (kombu's Channel, redis.client.Redis, application pools) already + handle ``ConnectionError`` with their own retry/reconnect logic, so + this surfaces the failure cleanly instead of recursing. + +The patch is a no-op when redis-py is unavailable or when it already +contains the upstream fix (detected by ``connect_check_health`` method). + +This module is imported from ``celery/__init__.py`` so the patch is +applied before any Connection is instantiated by kombu or application +code. Remove this module and its import after celery is upgraded to a +version that allows ``redis-py >= 6.0.0`` transitively (celery >= 5.6, +kombu >= 5.6). + +References: + +- redis-py PR #3557: https://github.com/redis/redis-py/pull/3557 +- redis-py issue #3745: https://github.com/redis/redis-py/issues/3745 +- Linear GMLP-9206 (Redis-broker RecursionError crash, 261 restarts). +""" + +from __future__ import annotations + + +def _apply_redis_py_recursion_workaround() -> None: + """Patch ``redis.connection.Connection.send_packed_command`` in place. + + Idempotent and safe to call multiple times. No-op when redis-py is + not importable or already contains the upstream fix. + """ + try: + import redis.connection as _rc + except ImportError: + return + + # Upstream PR #3557 introduces ``connect_check_health`` as a sibling + # of ``connect``. Its presence is a reliable signal that the fix is + # already in the running redis-py and our patch is unnecessary. + if hasattr(_rc.Connection, 'connect_check_health'): + return + + # Avoid double-patching if this module is re-imported. + if getattr(_rc.Connection.send_packed_command, '_gumloop_patched', False): + return + + _orig_send_packed_command = _rc.Connection.send_packed_command + + def send_packed_command(self, command, check_health=True): + if not self._sock: + raise _rc.ConnectionError( + "Connection lost; send_packed_command will not auto-reconnect " + "to avoid the redis-py < 6.0.0b2 on_connect recursion " + "(see redis/redis-py#3557)." + ) + return _orig_send_packed_command( + self, command, check_health=check_health, + ) + + send_packed_command._gumloop_patched = True # type: ignore[attr-defined] + _rc.Connection.send_packed_command = send_packed_command + + +_apply_redis_py_recursion_workaround() From 7a2d497e407aeb2097ac9418665a4f6d5c619b86 Mon Sep 17 00:00:00 2001 From: waiho-gumloop Date: Tue, 30 Jun 2026 10:44:49 -0700 Subject: [PATCH 2/4] Remove redundant comment in celery/__init__.py The imported module's own docstring already explains what it does. --- celery/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/celery/__init__.py b/celery/__init__.py index 0fa6e30338..dc9b90cf43 100644 --- a/celery/__init__.py +++ b/celery/__init__.py @@ -15,8 +15,6 @@ # Lazy loading from . import local -# Apply redis-py recursion workaround (GMLP-9206) before any kombu/redis-py -# Connection is instantiated. No-op for redis-py >= 6.0.0b2. from . import _redis_py_recursion_workaround # noqa: F401 SERIES = 'immunity' From 54051ed1f702d8a040b2ba80c481ae11bbcdfe44 Mon Sep 17 00:00:00 2001 From: waiho-gumloop Date: Tue, 30 Jun 2026 10:53:57 -0700 Subject: [PATCH 3/4] Port upstream PR #3557 behavior (transparent reconnect) + observability log Replaces the minimal patch (which raised ConnectionError to break the cycle) with a faithful port of redis-py PR #3557's actual logic: * Adds Connection.connect_check_health(check_health: bool = True) * Adds Connection.on_connect_check_health(check_health: bool = True) * Rebinds Connection.connect() and Connection.on_connect() as backwards-compatible wrappers * In Connection.send_packed_command, replaces the reconnect branch 'if not self._sock: self.connect()' with 'self.connect_check_health(check_health=False)' so the inner reconnect does not re-fire check_health -> _send_ping -> send_packed_command -> connect -> ... (the recursion cycle) Preserves the upstream contract: a transient broker blip is recovered transparently inside send_packed_command, the outer caller's command succeeds against the freshly-established connection. Application code using redis-py directly (e.g. cache clients) no longer sees a ConnectionError on every blip, matching the behavior callers expect from redis-py. Adds one operational concession to upstream: a single WARNING per worker process the first time the suppressed-check_health reconnect fires, with subsequent reconnects logged at DEBUG. Restores a small observability signal for incident triage without log spam. Validated on the beta cluster (deterministic in-pod reproducer, 5/5 runs show recursion broken, observability WARNING fires correctly, control flow matches the expected 4-attempt retry shape). --- celery/_redis_py_recursion_workaround.py | 297 +++++++++++++++++++---- 1 file changed, 244 insertions(+), 53 deletions(-) diff --git a/celery/_redis_py_recursion_workaround.py b/celery/_redis_py_recursion_workaround.py index 29e47bf519..86a7a2d46e 100644 --- a/celery/_redis_py_recursion_workaround.py +++ b/celery/_redis_py_recursion_workaround.py @@ -1,88 +1,279 @@ """Backport of redis-py PR #3557 for redis-py < 6.0.0b2. -When the broker connection drops mid-handshake (e.g. broker pod restart, -TCP RST), redis-py < 6.0.0b2 enters infinite recursion through: +Faithful port of the upstream fix +(https://github.com/redis/redis-py/pull/3557). Introduces +``connect_check_health`` and ``on_connect_check_health`` on +``redis.connection.Connection`` and threads ``check_health`` through every +``send_command`` call inside ``on_connect_check_health``, so the inner +reconnect path triggered from ``check_health`` cannot re-trigger +``check_health`` recursively. - on_connect() -> send_command("CLIENT","SETINFO",...) - -> send_packed_command(check_health=True) - -> check_health() -> _send_ping() - -> send_command("PING", check_health=False) - -> send_packed_command(check_health=False) - -> if not self._sock: self.connect() - -> on_connect() ... (RecursionError at ~1000 frames) - -The fix in redis-py PR #3557 (released in v6.0.0b2, April 2025) splits -``connect`` and ``on_connect`` into ``connect_check_health`` / -``on_connect_check_health`` and threads ``check_health=False`` through -the inner reconnect path so the cycle cannot form. +The bug +------- +On 2026-06-25, ~261 Celery workers crashed in production with:: -Because kombu < 5.6.0 pins ``redis<=5.2.1``, gumloop-celery users cannot -adopt the fix without a coordinated celery + kombu + redis-py upgrade. -Until that upgrade lands, this module applies the minimal localized -patch needed to break the recursion at the call site. + CRITICAL Unrecoverable error: RecursionError( + 'maximum recursion depth exceeded while calling a Python object') -The patch: +during a Redis broker rolling replacement. Cause: -- replaces ``Connection.send_packed_command``'s "no socket -> connect()" - branch with a clean ``ConnectionError``. Higher-level callers - (kombu's Channel, redis.client.Redis, application pools) already - handle ``ConnectionError`` with their own retry/reconnect logic, so - this surfaces the failure cleanly instead of recursing. + connect() -> on_connect() + -> send_command("CLIENT","SETINFO","LIB-NAME", lib_name) + -> send_packed_command(check_health=True) + -> check_health() -> _send_ping() -> send_command("PING", check_health=False) + -> send_packed_command(check_health=False) + -> if not self._sock: self.connect() # <-- re-enters top + ... (~1000 frames until Python stack limit kills the worker) -The patch is a no-op when redis-py is unavailable or when it already -contains the upstream fix (detected by ``connect_check_health`` method). +Why a monkeypatch (instead of a redis-py upgrade) +------------------------------------------------- +``kombu 5.5.x`` pins ``redis<=5.2.1``; ``celery 5.5.x`` pins +``kombu<5.6``. Adopting redis-py >= 6.0 (where the fix shipped) requires a +coordinated ``celery 5.6+ / kombu 5.6+ / redis-py 6.0+`` upgrade plus a +rebase of this fork onto upstream celery 5.6+. Until that lands, this +module is the tactical patch. -This module is imported from ``celery/__init__.py`` so the patch is -applied before any Connection is instantiated by kombu or application -code. Remove this module and its import after celery is upgraded to a -version that allows ``redis-py >= 6.0.0`` transitively (celery >= 5.6, -kombu >= 5.6). +Differences from upstream +------------------------- +1. Applied as a monkeypatch via import side-effect from + ``celery/__init__.py``; the upstream fix is a redis-py code change. +2. Logs a single WARNING per worker process the first time the + suppressed-check_health reconnect actually fires. Upstream is silent. + This restores a small observability signal: when a transient broker + blip is silently recovered, the worker log line tells operators it + happened. Subsequent reconnects log at DEBUG to avoid spam. -References: +The patch is a no-op when: + - ``redis-py`` is not installed; + - ``redis-py >= 6.0.0b2`` (detected via ``Connection.connect_check_health``). +References +---------- - redis-py PR #3557: https://github.com/redis/redis-py/pull/3557 - redis-py issue #3745: https://github.com/redis/redis-py/issues/3745 -- Linear GMLP-9206 (Redis-broker RecursionError crash, 261 restarts). +- Linear GMLP-9206 """ from __future__ import annotations +import logging -def _apply_redis_py_recursion_workaround() -> None: - """Patch ``redis.connection.Connection.send_packed_command`` in place. +logger = logging.getLogger(__name__) - Idempotent and safe to call multiple times. No-op when redis-py is - not importable or already contains the upstream fix. - """ + +def _apply_redis_py_recursion_workaround() -> None: + """Patch ``redis.connection.Connection`` in place. Idempotent.""" try: import redis.connection as _rc except ImportError: return - # Upstream PR #3557 introduces ``connect_check_health`` as a sibling - # of ``connect``. Its presence is a reliable signal that the fix is - # already in the running redis-py and our patch is unnecessary. - if hasattr(_rc.Connection, 'connect_check_health'): + # Upstream fix already present (redis-py >= 6.0.0b2). Nothing to do. + if hasattr(_rc.Connection, "connect_check_health"): return - # Avoid double-patching if this module is re-imported. - if getattr(_rc.Connection.send_packed_command, '_gumloop_patched', False): + # Don't double-patch on re-import. + if getattr(_rc.Connection.send_packed_command, "_gumloop_patched", False): return + import socket + + from redis._parsers import _RESP2Parser, _RESP3Parser + from redis.credentials import UsernamePasswordCredentialProvider + from redis.exceptions import ( + AuthenticationError, + AuthenticationWrongNumberOfArgsError, + ConnectionError, + RedisError, + ResponseError, + TimeoutError, + ) + from redis.utils import str_if_bytes + + # One-shot flag so we WARN once per worker process, then DEBUG. + _warned_once = [False] + + def on_connect_check_health(self, check_health: bool = True) -> None: + """Initialize the connection, authenticate and select a database. + + Verbatim copy of redis-py 5.2.1's ``Connection.on_connect``, + modified to thread ``check_health`` through every ``send_command`` + call so the inner reconnect path can suppress ``check_health`` and + break the recursion. + """ + self._parser.on_connect(self) + parser = self._parser + + auth_args = None + if self.credential_provider or (self.username or self.password): + cred_provider = ( + self.credential_provider + or UsernamePasswordCredentialProvider(self.username, self.password) + ) + auth_args = cred_provider.get_credentials() + + if auth_args and self.protocol not in [2, "2"]: + if isinstance(self._parser, _RESP2Parser): + self.set_parser(_RESP3Parser) + self._parser.EXCEPTION_CLASSES = parser.EXCEPTION_CLASSES + self._parser.on_connect(self) + if len(auth_args) == 1: + auth_args = ["default", auth_args[0]] + self.send_command( + "HELLO", + self.protocol, + "AUTH", + *auth_args, + check_health=check_health, + ) + self.handshake_metadata = self.read_response() + elif auth_args: + # AUTH itself must not check health (the connection isn't + # authenticated yet, so PING would fail). Preserved from upstream. + self.send_command("AUTH", *auth_args, check_health=False) + try: + auth_response = self.read_response() + except AuthenticationWrongNumberOfArgsError: + self.send_command("AUTH", auth_args[-1], check_health=False) + auth_response = self.read_response() + if str_if_bytes(auth_response) != "OK": + raise AuthenticationError("Invalid Username or Password") + elif self.protocol not in [2, "2"]: + if isinstance(self._parser, _RESP2Parser): + self.set_parser(_RESP3Parser) + self._parser.EXCEPTION_CLASSES = parser.EXCEPTION_CLASSES + self._parser.on_connect(self) + self.send_command("HELLO", self.protocol, check_health=check_health) + self.handshake_metadata = self.read_response() + if ( + self.handshake_metadata.get(b"proto") != self.protocol + and self.handshake_metadata.get("proto") != self.protocol + ): + raise ConnectionError("Invalid RESP version") + + if self.client_name: + self.send_command( + "CLIENT", "SETNAME", self.client_name, check_health=check_health, + ) + if str_if_bytes(self.read_response()) != "OK": + raise ConnectionError("Error setting client name") + + try: + if self.lib_name: + self.send_command( + "CLIENT", + "SETINFO", + "LIB-NAME", + self.lib_name, + check_health=check_health, + ) + self.read_response() + except ResponseError: + pass + + try: + if self.lib_version: + self.send_command( + "CLIENT", + "SETINFO", + "LIB-VER", + self.lib_version, + check_health=check_health, + ) + self.read_response() + except ResponseError: + pass + + if self.db: + self.send_command("SELECT", self.db, check_health=check_health) + if str_if_bytes(self.read_response()) != "OK": + raise ConnectionError("Invalid Database") + + def connect_check_health(self, check_health: bool = True) -> None: + """Connect to the Redis server if not already connected. + + Verbatim copy of redis-py 5.2.1's ``Connection.connect``, modified + to call ``on_connect_check_health(check_health=check_health)`` + instead of ``on_connect()``. + """ + if self._sock: + return + try: + sock = self.retry.call_with_retry( + lambda: self._connect(), + lambda error: self.disconnect(error), + ) + except socket.timeout: + raise TimeoutError("Timeout connecting to server") + except OSError as e: + raise ConnectionError(self._error_message(e)) + + self._sock = sock + try: + if self.redis_connect_func is None: + self.on_connect_check_health(check_health=check_health) + else: + self.redis_connect_func(self) + except RedisError: + self.disconnect() + raise + + # Run any user callbacks. The only internal callback today is + # pubsub channel/pattern resubscription. + self._connect_callbacks = [ref for ref in self._connect_callbacks if ref()] + for ref in self._connect_callbacks: + callback = ref() + if callback: + callback(self) + + def connect(self) -> None: + """Backwards-compatible wrapper. Same shape as upstream PR #3557.""" + self.connect_check_health(check_health=True) + + def on_connect(self) -> None: + """Backwards-compatible wrapper. Same shape as upstream PR #3557.""" + self.on_connect_check_health(check_health=True) + _orig_send_packed_command = _rc.Connection.send_packed_command - def send_packed_command(self, command, check_health=True): + def send_packed_command(self, command, check_health: bool = True): + """Same as the original ``send_packed_command``, but when reconnect + is needed mid-command, suppress ``check_health`` during the inner + reconnect to break the redis-py < 6.0.0b2 recursion. + + Also logs the reconnect for operator visibility (once per process + at WARNING, then DEBUG). + """ if not self._sock: - raise _rc.ConnectionError( - "Connection lost; send_packed_command will not auto-reconnect " - "to avoid the redis-py < 6.0.0b2 on_connect recursion " - "(see redis/redis-py#3557)." - ) - return _orig_send_packed_command( - self, command, check_health=check_health, - ) + if not _warned_once[0]: + _warned_once[0] = True + logger.warning( + "[GMLP-9206] redis-py reconnect mid-command " + "(host=%s port=%s db=%s). Backport of redis-py PR #3557 " + "active; transient broker blip silently recovered. " + "This WARNING is emitted once per worker process; " + "subsequent reconnects log at DEBUG.", + getattr(self, "host", "?"), + getattr(self, "port", "?"), + getattr(self, "db", "?"), + ) + else: + logger.debug( + "[GMLP-9206] redis-py reconnect mid-command " + "(host=%s port=%s db=%s)", + getattr(self, "host", "?"), + getattr(self, "port", "?"), + getattr(self, "db", "?"), + ) + self.connect_check_health(check_health=False) + return _orig_send_packed_command(self, command, check_health=check_health) send_packed_command._gumloop_patched = True # type: ignore[attr-defined] + + _rc.Connection.connect_check_health = connect_check_health + _rc.Connection.on_connect_check_health = on_connect_check_health + _rc.Connection.connect = connect + _rc.Connection.on_connect = on_connect _rc.Connection.send_packed_command = send_packed_command From a1d47cd7f6891fbbc9e2cab528d2834c29f2c7cf Mon Sep 17 00:00:00 2001 From: waiho-gumloop Date: Tue, 30 Jun 2026 18:26:44 -0700 Subject: [PATCH 4/4] Add exact redis-py v5.2.1 source permalinks in docstrings Point to the specific line ranges of the AbstractConnection.connect and AbstractConnection.on_connect methods that this module ports, so reviewers can diff the port against the exact upstream source. --- celery/_redis_py_recursion_workaround.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/celery/_redis_py_recursion_workaround.py b/celery/_redis_py_recursion_workaround.py index 86a7a2d46e..883b29aa90 100644 --- a/celery/_redis_py_recursion_workaround.py +++ b/celery/_redis_py_recursion_workaround.py @@ -96,7 +96,8 @@ def _apply_redis_py_recursion_workaround() -> None: def on_connect_check_health(self, check_health: bool = True) -> None: """Initialize the connection, authenticate and select a database. - Verbatim copy of redis-py 5.2.1's ``Connection.on_connect``, + Verbatim copy of redis-py 5.2.1's ``AbstractConnection.on_connect`` + (https://github.com/redis/redis-py/blob/v5.2.1/redis/connection.py#L398-L487), modified to thread ``check_health`` through every ``send_command`` call so the inner reconnect path can suppress ``check_health`` and break the recursion. @@ -192,8 +193,9 @@ def on_connect_check_health(self, check_health: bool = True) -> None: def connect_check_health(self, check_health: bool = True) -> None: """Connect to the Redis server if not already connected. - Verbatim copy of redis-py 5.2.1's ``Connection.connect``, modified - to call ``on_connect_check_health(check_health=check_health)`` + Verbatim copy of redis-py 5.2.1's ``AbstractConnection.connect`` + (https://github.com/redis/redis-py/blob/v5.2.1/redis/connection.py#L352-L387), + modified to call ``on_connect_check_health(check_health=check_health)`` instead of ``on_connect()``. """ if self._sock: