Skip to content

Commit cb1519a

Browse files
jeet1995Copilot
andauthored
Add exception handler on HTTP/2 parent channel to suppress WARN logs (#48890)
* Add exception handler on HTTP/2 parent channel to suppress WARN logs In HTTP/2, reactor-netty multiplexes streams on a shared parent TCP connection. The parent channel pipeline has no ChannelOperationsHandler (unlike HTTP/1.1), so TCP-level exceptions like Connection reset by peer (ECONNRESET) propagate to Netty's TailContext, which logs them as WARN. This adds Http2ParentChannelExceptionHandler to the parent channel via doOnConnected (accessing channel.parent()). The handler consumes exceptions at DEBUG level WITHOUT closing the channel or altering connection lifecycle, matching HTTP/1.1 logging behavior. Changes: - Handler logs cause.toString() (not getMessage()) for null-safe diagnostics - Defensive try-catch for duplicate handler name on concurrent stream creation - Before/after verified with EmbeddedChannel unit tests Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Revert azure-cosmos-tests pom.xml changes Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Clarify why duplicate-name is the only possible IAE in handler install Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Move static utility method to bottom of test file Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address Copilot review: fix comment accuracy, remove duplicate cause.toString(), update changelog Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add tests for activeStreams > 0 path (WARN branch) Address Bhaskar's review: add two tests covering the else branch where activeStreams > 0 on an active channel, exercising the WARN log path. - withHandler_activeStreams_consumedAtWarn: creates an active H2 stream via codec.connection().local().createStream(), fires an exception, and verifies it is consumed (does not reach TailContext). - withHandler_activeStreams_channelNotClosed: same setup, verifies the handler does not close the channel even with active streams. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add test for codec-absent fallback path When Http2FrameCodec is absent from the pipeline, getActiveStreamCount() returns -1. Since -1 != 0 and channelActive == true, the handler takes the safe WARN path. This test covers that fallback behavior. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address review: nullable getActiveStreamCount, Error propagation, debug log in catch - Change getActiveStreamCount() to return Integer (nullable) instead of int with -1 sentinel. null explicitly means 'could not determine' and takes the safe WARN path. (Addresses Fabian's review) - Add logger.debug in catch block so codec retrieval failures are observable instead of silently swallowed. - Add Error guard in exceptionCaught: Error types (OOM, SOF) propagate to TailContext instead of being consumed. (Addresses Xinlian's review) - Add withHandler_errorNotConsumed_propagatesToTail test. - Update Javadoc to reflect Exception-only consumption and Error passthrough. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix: install H2 exception handler on correct channel In reactor-netty's H2 path, doOnConnected fires once per TCP connection and connection.channel() IS the parent channel (channel.parent() is null). The previous code assumed doOnConnected fires for child/stream channels where channel.parent() would return the TCP parent. Fix: resolve the H2 parent as channel.parent() ?? channel, handling both the observed case (parent=null, channel IS the parent) and the alternate case (parent!=null, install on parent). Verified with integration test: - Linux/epoll with TCP RST proxy (SO_LINGER=0, 30s idle timeout) - 4.79.1 baseline: TailContext WARN appeared (Connection reset by peer) - Fixed build: WARN suppressed, handler logged at DEBUG (activeStreams=0) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Simplify handler installation — use channelPipeline directly doOnConnected fires for the parent TCP channel in reactor-netty's H2 path, so connection.channel() IS the parent. No need for channel.parent() resolution. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address Kushagra review: log addresses, make handler @sharable singleton - Add local/remote address to WARN and DEBUG log messages for diagnostic parity with RNTBD connection loggers - Mark handler @ChannelHandler.Sharable with singleton INSTANCE (handler is stateless - no instance fields) - Update ReactorNettyClient to use INSTANCE instead of new - Update tests to use INSTANCE Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Use string append for log messages, pass channel directly Matches PartitionProcessor/HealthChecker patterns - avoids SLF4J inline formatting issues. Channel.toString() provides L:/R: addresses. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Use channel= prefix in log messages for clarity Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add clientVmId to exception handler log messages Resolve vmId lazily via ClientTelemetry.getMachineId(null) on first access from non-event-loop thread. Store as immutable field in the @sharable handler singleton. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix: resolve vmId eagerly on caller thread, not event loop Remove lazy singleton pattern (getOrCreateInstance) that could call ClientTelemetry.getMachineId() on the Netty event loop (5s blocking). Instead, create handler eagerly in configureChannelPipelineHandlers() which runs on the caller's setup thread. The @sharable handler instance is captured by the doOnConnected lambda. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix: use non-blocking getCachedMachineId() for vmId Remove all blocking calls. Add ClientTelemetry.getCachedMachineId() which reads a volatile field populated by getMachineId() during client init. Handler reads it at log time - pure volatile read, zero blocking. Restores static INSTANCE singleton (handler is stateless again). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Use n/a fallback for unresolved vmId in logs Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent d3eead7 commit cb1519a

5 files changed

Lines changed: 385 additions & 0 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation.http;
5+
6+
import io.netty.channel.ChannelInboundHandlerAdapter;
7+
import io.netty.channel.embedded.EmbeddedChannel;
8+
import io.netty.handler.codec.http2.Http2FrameCodec;
9+
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
10+
import io.netty.handler.codec.http2.Http2MultiplexHandler;
11+
import org.testng.annotations.Test;
12+
13+
import java.io.IOException;
14+
15+
import static org.assertj.core.api.Assertions.assertThat;
16+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
17+
18+
/**
19+
* Verifies that {@link Http2ParentChannelExceptionHandler} uses connection
20+
* state — active stream count and channel activity — to determine whether
21+
* exceptions are logged at DEBUG (suppressed) or WARN (preserved).
22+
* Exception type is NOT a filtering dimension.
23+
*
24+
* The EmbeddedChannel is configured to mirror the production HTTP/2 parent
25+
* channel pipeline:
26+
* <pre>
27+
* Http2FrameCodec → Http2MultiplexHandler → Http2ParentChannelExceptionHandler → TailContext
28+
* </pre>
29+
* (SslHandler is omitted because it requires an SSLContext and is not relevant
30+
* to exception propagation behavior.)
31+
*
32+
* {@code checkException()} re-throws any exception that reached the pipeline tail.
33+
*/
34+
public class Http2ParentChannelExceptionHandlerTest {
35+
36+
/**
37+
* BEFORE fix — without the handler, exceptions reach the pipeline tail.
38+
* EmbeddedChannel's checkException() re-throws the unhandled exception,
39+
* proving it reached Netty's TailContext (which in production logs as WARN).
40+
*/
41+
@Test(groups = "unit")
42+
public void withoutHandler_exceptionReachesTail() {
43+
EmbeddedChannel channel = createH2ParentChannel(false);
44+
45+
channel.pipeline().fireExceptionCaught(
46+
new IOException("Connection reset by peer"));
47+
48+
assertThatThrownBy(channel::checkException)
49+
.isInstanceOf(IOException.class)
50+
.hasMessageContaining("Connection reset by peer");
51+
52+
channel.finishAndReleaseAll();
53+
}
54+
55+
/**
56+
* With handler — exception on idle connection (0 active streams) is
57+
* consumed at DEBUG. The suppression is based on connection state
58+
* (no active streams), not exception type.
59+
*
60+
* In production, channel.isActive() transitions to false during the
61+
* RST handling cycle, satisfying the OR condition. In EmbeddedChannel
62+
* we can only verify the activeStreams == 0 branch.
63+
*/
64+
@Test(groups = "unit")
65+
public void withHandler_zeroActiveStreams_consumedAtDebug() {
66+
EmbeddedChannel channel = createH2ParentChannel(true);
67+
68+
Http2FrameCodec codec = channel.pipeline().get(Http2FrameCodec.class);
69+
assertThat(codec).isNotNull();
70+
assertThat(codec.connection().numActiveStreams()).isEqualTo(0);
71+
72+
channel.pipeline().fireExceptionCaught(
73+
new IOException("recvAddress(..) failed with error(-104): Connection reset by peer"));
74+
75+
// Exception consumed — does NOT reach tail
76+
channel.checkException();
77+
78+
channel.finishAndReleaseAll();
79+
}
80+
81+
/**
82+
* Handler does not close the channel — connection lifecycle is managed
83+
* by reactor-netty's pool eviction, not by this handler.
84+
*/
85+
@Test(groups = "unit")
86+
public void withHandler_exceptionDoesNotCloseChannel() {
87+
EmbeddedChannel channel = createH2ParentChannel(true);
88+
89+
assertThat(channel.isActive()).isTrue();
90+
91+
channel.pipeline().fireExceptionCaught(
92+
new IOException("Connection reset by peer"));
93+
94+
channel.checkException();
95+
assertThat(channel.isOpen()).isTrue();
96+
97+
channel.finishAndReleaseAll();
98+
}
99+
100+
/**
101+
* RuntimeException on idle connection is also consumed — suppression
102+
* is based on connection state, not exception type.
103+
*/
104+
@Test(groups = "unit")
105+
public void withHandler_runtimeException_zeroActiveStreams_consumed() {
106+
EmbeddedChannel channel = createH2ParentChannel(true);
107+
108+
channel.pipeline().fireExceptionCaught(
109+
new RuntimeException("Unexpected state error"));
110+
111+
channel.checkException();
112+
113+
channel.finishAndReleaseAll();
114+
}
115+
116+
/**
117+
* NullPointerException on idle connection is also consumed — same
118+
* connection-state-based suppression regardless of exception type.
119+
*/
120+
@Test(groups = "unit")
121+
public void withHandler_npe_zeroActiveStreams_consumed() {
122+
EmbeddedChannel channel = createH2ParentChannel(true);
123+
124+
channel.pipeline().fireExceptionCaught(
125+
new NullPointerException("handler bug"));
126+
127+
channel.checkException();
128+
129+
channel.finishAndReleaseAll();
130+
}
131+
132+
/**
133+
* With handler — exception on a connection with active streams is
134+
* consumed (does not reach TailContext). The handler logs at WARN
135+
* instead of DEBUG because in-flight requests may be affected.
136+
*/
137+
@Test(groups = "unit")
138+
public void withHandler_activeStreams_consumedAtWarn() throws Exception {
139+
EmbeddedChannel channel = createH2ParentChannel(true);
140+
141+
Http2FrameCodec codec = channel.pipeline().get(Http2FrameCodec.class);
142+
assertThat(codec).isNotNull();
143+
144+
// Create an active stream (client-initiated, odd stream ID)
145+
codec.connection().local().createStream(1, false);
146+
assertThat(codec.connection().numActiveStreams()).isEqualTo(1);
147+
assertThat(channel.isActive()).isTrue();
148+
149+
channel.pipeline().fireExceptionCaught(
150+
new IOException("Connection reset by peer"));
151+
152+
// Exception consumed — does NOT reach tail, even with active streams
153+
channel.checkException();
154+
155+
channel.finishAndReleaseAll();
156+
}
157+
158+
/**
159+
* Handler does not close the channel even when active streams exist —
160+
* connection lifecycle is managed by reactor-netty's pool eviction.
161+
*/
162+
@Test(groups = "unit")
163+
public void withHandler_activeStreams_channelNotClosed() throws Exception {
164+
EmbeddedChannel channel = createH2ParentChannel(true);
165+
166+
Http2FrameCodec codec = channel.pipeline().get(Http2FrameCodec.class);
167+
assertThat(codec).isNotNull();
168+
169+
codec.connection().local().createStream(1, false);
170+
assertThat(codec.connection().numActiveStreams()).isEqualTo(1);
171+
assertThat(channel.isActive()).isTrue();
172+
173+
channel.pipeline().fireExceptionCaught(
174+
new IOException("Connection reset by peer"));
175+
176+
channel.checkException();
177+
assertThat(channel.isOpen()).isTrue();
178+
179+
channel.finishAndReleaseAll();
180+
}
181+
182+
/**
183+
* With handler — when Http2FrameCodec is absent from the pipeline,
184+
* getActiveStreamCount() returns null. Since the active stream count
185+
* is unknown and the channel is active, the handler takes the safe
186+
* WARN path. This covers the fallback behavior when the codec is
187+
* unavailable (e.g., torn down during shutdown).
188+
*/
189+
@Test(groups = "unit")
190+
public void withHandler_codecAbsent_fallsBackToWarnPath() {
191+
EmbeddedChannel channel = new EmbeddedChannel(
192+
Http2ParentChannelExceptionHandler.INSTANCE);
193+
194+
assertThat(channel.pipeline().get(Http2FrameCodec.class)).isNull();
195+
assertThat(channel.isActive()).isTrue();
196+
197+
channel.pipeline().fireExceptionCaught(
198+
new IOException("Connection reset by peer"));
199+
200+
// Exception consumed — does NOT reach tail
201+
channel.checkException();
202+
assertThat(channel.isOpen()).isTrue();
203+
204+
channel.finishAndReleaseAll();
205+
}
206+
207+
/**
208+
* Error types (e.g., OutOfMemoryError) are NOT consumed by the handler —
209+
* they propagate to TailContext. This ensures JVM-level errors are never
210+
* silently swallowed.
211+
*/
212+
@Test(groups = "unit")
213+
public void withHandler_errorNotConsumed_propagatesToTail() {
214+
EmbeddedChannel channel = createH2ParentChannel(true);
215+
216+
channel.pipeline().fireExceptionCaught(
217+
new OutOfMemoryError("test OOM"));
218+
219+
assertThatThrownBy(channel::checkException)
220+
.isInstanceOf(OutOfMemoryError.class)
221+
.hasMessageContaining("test OOM");
222+
223+
channel.finishAndReleaseAll();
224+
}
225+
226+
/**
227+
* Creates an EmbeddedChannel matching the production HTTP/2 parent channel
228+
* pipeline (minus SslHandler): Http2FrameCodec → Http2MultiplexHandler →
229+
* Http2ParentChannelExceptionHandler.
230+
*/
231+
private static EmbeddedChannel createH2ParentChannel(boolean withExceptionHandler) {
232+
Http2FrameCodec codec = Http2FrameCodecBuilder.forClient()
233+
.autoAckSettingsFrame(true)
234+
.build();
235+
236+
Http2MultiplexHandler multiplexHandler = new Http2MultiplexHandler(
237+
new ChannelInboundHandlerAdapter());
238+
239+
if (withExceptionHandler) {
240+
return new EmbeddedChannel(codec, multiplexHandler,
241+
Http2ParentChannelExceptionHandler.INSTANCE);
242+
} else {
243+
return new EmbeddedChannel(codec, multiplexHandler);
244+
}
245+
}
246+
}

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* Fixed an issue where `CustomItemSerializer` was incorrectly applied to internal SDK query pipeline structures (e.g., `OrderByRowResult`, `Document`), causing deserialization failures in ORDER BY, GROUP BY, aggregate, DISTINCT, and hybrid search queries. - See [PR 48811](https://github.com/Azure/azure-sdk-for-java/pull/48811)
1414
* Fixed an issue where `SqlParameter` ignored the configured `CustomItemSerializer`, always using the internal default serializer instead. - See [PR 48811](https://github.com/Azure/azure-sdk-for-java/pull/48811)
1515
* Fixed a `ClientTelemetry` static initialization failure when IMDS access is disabled, preventing `NoClassDefFoundError` during Cosmos client creation in non-Azure environments. - See [PR 48888](https://github.com/Azure/azure-sdk-for-java/pull/48888)
16+
* Fixed an issue where Netty could log "An exceptionCaught() event was fired, and it reached at the tail of the pipeline" on HTTP/2 connections when the server resets idle TCP connections by adding an exception handler on the HTTP/2 parent channel to handle these connection-level exceptions more appropriately. - See [PR 48890](https://github.com/Azure/azure-sdk-for-java/pull/48890)
1617

1718
#### Other Changes
1819

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/clienttelemetry/ClientTelemetry.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,21 @@ public CosmosClientTelemetryConfig getClientTelemetryConfig() {
121121
return clientTelemetryConfig;
122122
}
123123

124+
// Non-blocking cache for machine ID. Populated by getMachineId() on first
125+
// successful IMDS resolution (called during client init on non-event-loop threads).
126+
// Read by components that cannot block (e.g., Netty channel handlers).
127+
private static volatile String cachedMachineId;
128+
129+
/**
130+
* Returns the cached machine ID without blocking.
131+
* Returns empty string if the machine ID has not yet been resolved
132+
* (i.e., getMachineId() has not been called yet from a non-event-loop thread).
133+
*/
134+
public static String getCachedMachineId() {
135+
String id = cachedMachineId;
136+
return id != null ? id : "n/a";
137+
}
138+
124139
/**
125140
* Blocking version of machine ID lookup. Used by Spark connector (CosmosClientCache.scala).
126141
* Delegates to getMachineId which waits up to 5s for IMDS metadata.
@@ -136,6 +151,7 @@ public static String getMachineId(DiagnosticsClientContext.DiagnosticsClientConf
136151
AzureVMMetadata metadata = CACHED_METADATA.block(Duration.ofSeconds(5));
137152
if (metadata != null && metadata != METADATA_NOT_AVAILABLE && metadata.getVmId() != null) {
138153
String machineId = VM_ID_PREFIX + metadata.getVmId();
154+
cachedMachineId = machineId;
139155
if (diagnosticsClientConfig != null) {
140156
diagnosticsClientConfig.withMachineId(machineId);
141157
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation.http;
5+
6+
import io.netty.channel.ChannelHandler;
7+
import io.netty.channel.ChannelHandlerContext;
8+
import io.netty.channel.ChannelInboundHandlerAdapter;
9+
import io.netty.handler.codec.http2.Http2FrameCodec;
10+
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
/**
15+
* Exception handler for the HTTP/2 parent (TCP) channel pipeline.
16+
* <p>
17+
* In HTTP/2, reactor-netty multiplexes streams on a shared parent TCP connection.
18+
* Child stream channels have {@code ChannelOperationsHandler} which catches exceptions
19+
* and fails the active subscriber (matching HTTP/1.1 behavior). However, the parent
20+
* channel has no such handler — exceptions propagate to Netty's {@code TailContext}
21+
* which logs them as WARN ("An exceptionCaught() event was fired, and it reached at
22+
* the tail of the pipeline").
23+
* <p>
24+
* This handler consumes {@link Exception}s on the parent channel and uses connection
25+
* state to decide the log level:
26+
* <ul>
27+
* <li><b>DEBUG</b> — when {@code activeStreams == 0} OR {@code !channelActive}.
28+
* No in-flight requests are affected (e.g., TCP RST from LB idle timeout,
29+
* post-close cleanup).</li>
30+
* <li><b>WARN</b> — when active streams exist on a live channel, or when the
31+
* active stream count cannot be determined. The exception may affect
32+
* in-flight requests and is worth alerting on.</li>
33+
* </ul>
34+
* <p>
35+
* {@link Error} types (e.g., {@code OutOfMemoryError}) are never consumed — they
36+
* propagate to {@code TailContext} for standard Netty handling.
37+
* <p>
38+
* The handler does NOT close the channel or alter connection lifecycle — reactor-netty
39+
* and the connection pool's eviction predicate ({@code !channel.isActive()}) handle that
40+
* independently.
41+
*
42+
* @see ReactorNettyClient#configureChannelPipelineHandlers()
43+
*/
44+
@ChannelHandler.Sharable
45+
final class Http2ParentChannelExceptionHandler extends ChannelInboundHandlerAdapter {
46+
47+
static final Http2ParentChannelExceptionHandler INSTANCE = new Http2ParentChannelExceptionHandler();
48+
49+
static final String HANDLER_NAME = "cosmosH2ParentExceptionHandler";
50+
51+
private static final Logger logger = LoggerFactory.getLogger(Http2ParentChannelExceptionHandler.class);
52+
53+
private Http2ParentChannelExceptionHandler() {
54+
}
55+
56+
@Override
57+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
58+
// Do not consume JVM-level errors (OOM, StackOverflow, etc.) — let them
59+
// propagate to TailContext for standard Netty handling.
60+
if (cause instanceof Error) {
61+
ctx.fireExceptionCaught(cause);
62+
return;
63+
}
64+
65+
Integer activeStreams = getActiveStreamCount(ctx);
66+
boolean channelActive = ctx.channel().isActive();
67+
68+
if ((activeStreams != null && activeStreams == 0) || !channelActive) {
69+
// No active streams OR channel already inactive — exception is noise
70+
// (e.g., TCP RST from LB idle timeout, post-close cleanup).
71+
if (logger.isDebugEnabled()) {
72+
logger.debug(
73+
"Exception on HTTP/2 parent connection"
74+
+ " [channel=" + ctx.channel()
75+
+ ", activeStreams=" + activeStreams
76+
+ ", channelActive=" + channelActive
77+
+ ", clientVmId=" + ClientTelemetry.getCachedMachineId() + "]",
78+
cause);
79+
}
80+
} else {
81+
// Active streams on a live channel, or stream count unknown (null) —
82+
// exception may affect in-flight requests.
83+
logger.warn(
84+
"Exception on HTTP/2 parent connection"
85+
+ " [channel=" + ctx.channel()
86+
+ ", activeStreams=" + activeStreams
87+
+ ", channelActive=" + channelActive
88+
+ ", clientVmId=" + ClientTelemetry.getCachedMachineId() + "]",
89+
cause);
90+
}
91+
}
92+
93+
private static Integer getActiveStreamCount(ChannelHandlerContext ctx) {
94+
try {
95+
Http2FrameCodec codec = ctx.pipeline().get(Http2FrameCodec.class);
96+
if (codec != null) {
97+
return codec.connection().numActiveStreams();
98+
}
99+
} catch (Exception e) {
100+
logger.debug("Failed to retrieve active stream count from Http2FrameCodec", e);
101+
}
102+
return null;
103+
}
104+
}

0 commit comments

Comments
 (0)