Skip to content

Commit a73e994

Browse files
committed
Silence gRPC error spam on EAGAIN
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
1 parent 8cf248b commit a73e994

3 files changed

Lines changed: 100 additions & 0 deletions

File tree

ext/dapr-ext-workflow/AGENTS.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ Workflow (orchestrator) functions must remain generators (`def` with `yield`). T
122122

123123
**Concurrency sizing and load characterization.** See `docs/concurrency.md` for sizing recommendations (`maximum_concurrent_activity_work_items`, `maximum_thread_pool_workers`) and an async-vs-sync decision tree. The `benchmarks/` directory ships `bench_async_activities.py`; re-run it locally before claiming a perf regression. The generated `RESULTS.md` is gitignored because numbers are machine-specific; see `docs/concurrency.md` for the regen command.
124124

125+
**grpc.aio poller log noise.** The async client can emit benign `BlockingIOError: [Errno 11]` ERROR lines from `grpc.aio`'s `PollerCompletionQueue` under load. It is harmless and retried. `get_grpc_aio_channel` installs an internal `asyncio`-logger filter (`_silence_grpc_aio_poller_noise`) that drops only those records, so the SDK suppresses it automatically with no user action.
126+
125127

126128
### DaprWorkflowClient (`dapr_workflow_client.py`)
127129

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/aio/internal/shared.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
# See the License for the specific language governing permissions and
1010
# limitations under the License.
1111

12+
import logging
1213
from typing import Optional, Sequence, Union
1314

1415
import grpc
@@ -27,6 +28,30 @@
2728
grpc_aio.StreamStreamClientInterceptor,
2829
]
2930

31+
_POLLER_NOISE_MARKER = 'PollerCompletionQueue._handle_events'
32+
33+
34+
class _GrpcAioPollerNoiseFilter(logging.Filter):
35+
"""Drops the harmless grpc.aio poller BlockingIOError (EAGAIN) records.
36+
37+
The poller does a non-blocking read on its wake-up fd and can get EAGAIN, which
38+
asyncio logs at ERROR even though the read is retried and nothing is lost.
39+
"""
40+
41+
def filter(self, record: logging.LogRecord) -> bool:
42+
exc = record.exc_info[1] if record.exc_info else None
43+
is_poller_noise = isinstance(exc, BlockingIOError) and (
44+
_POLLER_NOISE_MARKER in record.getMessage()
45+
)
46+
return not is_poller_noise
47+
48+
49+
def _silence_grpc_aio_poller_noise() -> None:
50+
"""Install the poller-noise filter on the asyncio logger if not already present."""
51+
asyncio_logger = logging.getLogger('asyncio')
52+
if not any(isinstance(f, _GrpcAioPollerNoiseFilter) for f in asyncio_logger.filters):
53+
asyncio_logger.addFilter(_GrpcAioPollerNoiseFilter())
54+
3055

3156
def get_grpc_aio_channel(
3257
host_address: Optional[str],
@@ -42,6 +67,8 @@ def get_grpc_aio_channel(
4267
interceptors: Optional sequence of client interceptors to apply to the channel.
4368
options: Optional sequence of gRPC channel options as (key, value) tuples. Keys defined in https://grpc.github.io/grpc/core/group__grpc__arg__keys.html
4469
"""
70+
_silence_grpc_aio_poller_noise()
71+
4572
if host_address is None:
4673
host_address = get_default_host_address()
4774

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Copyright 2026 The Dapr Authors
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
12+
import logging
13+
import sys
14+
from unittest.mock import patch
15+
16+
from dapr.ext.workflow._durabletask.aio.internal import shared
17+
18+
HOST_ADDRESS = 'localhost:50051'
19+
20+
21+
def _record(msg: str, exc: BaseException | None) -> logging.LogRecord:
22+
exc_info = None
23+
if exc is not None:
24+
try:
25+
raise exc
26+
except BaseException:
27+
exc_info = sys.exc_info()
28+
return logging.LogRecord('asyncio', logging.ERROR, __file__, 1, msg, (), exc_info)
29+
30+
31+
def test_filter_drops_poller_eagain_record():
32+
record = _record(
33+
'Exception in callback PollerCompletionQueue._handle_events()',
34+
BlockingIOError(11, 'Resource temporarily unavailable'),
35+
)
36+
assert shared._GrpcAioPollerNoiseFilter().filter(record) is False
37+
38+
39+
def test_filter_keeps_record_without_exception():
40+
assert shared._GrpcAioPollerNoiseFilter().filter(_record('some other error', None)) is True
41+
42+
43+
def test_filter_keeps_blockingioerror_without_marker():
44+
record = _record('unrelated message', BlockingIOError(11, 'nope'))
45+
assert shared._GrpcAioPollerNoiseFilter().filter(record) is True
46+
47+
48+
def test_get_grpc_aio_channel_installs_filter_on_asyncio_logger():
49+
asyncio_logger = logging.getLogger('asyncio')
50+
for existing in [
51+
f for f in asyncio_logger.filters if isinstance(f, shared._GrpcAioPollerNoiseFilter)
52+
]:
53+
asyncio_logger.removeFilter(existing)
54+
55+
with patch('dapr.ext.workflow._durabletask.aio.internal.shared.grpc_aio.insecure_channel'):
56+
shared.get_grpc_aio_channel(HOST_ADDRESS, False)
57+
58+
installed = [
59+
f for f in asyncio_logger.filters if isinstance(f, shared._GrpcAioPollerNoiseFilter)
60+
]
61+
assert len(installed) == 1 # installed once, not duplicated
62+
63+
64+
def test_install_is_idempotent():
65+
asyncio_logger = logging.getLogger('asyncio')
66+
shared._silence_grpc_aio_poller_noise()
67+
shared._silence_grpc_aio_poller_noise()
68+
installed = [
69+
f for f in asyncio_logger.filters if isinstance(f, shared._GrpcAioPollerNoiseFilter)
70+
]
71+
assert len(installed) == 1

0 commit comments

Comments
 (0)