Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.lang.reflect.Method;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -25,7 +26,7 @@
* Covers state transitions that do not require advancing real time -- the handler's
* timeout / threshold logic reads {@code System.nanoTime()}, so the tests sidestep
* the clock by pre-setting {@code pingOutstandingSinceNanos} (or
* {@code lastActivityNanos}) via reflection:
* {@code lastReadActivityNanos}) via reflection:
* <ul>
* <li>PING ACK with matching payload resets the failure counter (RFC 9113 §6.7 payload echo).</li>
* <li>PING ACK with mismatched payload is ignored (late ACK after timeout cannot mask degradation).</li>
Expand Down Expand Up @@ -209,10 +210,14 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)

ChannelHandlerContext ctx = channel.pipeline().context(handler);
long longAgo = System.nanoTime() - TimeUnit.SECONDS.toNanos(60);
AtomicLong childRead = channel.attr(Http2PingHandler.LAST_CHILD_STREAM_READ_ACTIVITY_NANOS).get();

// First tick: force idle -> PING send attempt -> write fails.
// Expect consecutiveFailures=1, channel still open, task still scheduled.
setField(handler, "lastActivityNanos", longAgo);
// Rewind both lastReadActivityNanos AND the child-stream-read-activity attribute
// (seeded in handlerAdded to now()) so effectiveLastReadActivityNanos is stale.
setField(handler, "lastReadActivityNanos", longAgo);
childRead.set(longAgo);
invokeMaybeSendPing(handler, ctx);
channel.runPendingTasks(); // run the write + listener on the embedded event loop

Expand All @@ -222,7 +227,12 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
assertThat((Object) getField(handler, "pingTask")).isNotNull();

// Second tick: write fails again -> threshold reached -> task cancelled, channel closed.
setField(handler, "lastActivityNanos", longAgo);
// Also rewind lastPingSendNanos so the send-throttle (added with the child-stream
// activity fix to prevent rapid-fire PING storms after write failure) does not
// block the second send within the same pingInterval.
setField(handler, "lastReadActivityNanos", longAgo);
setField(handler, "lastPingSendNanos", longAgo);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation — Test Gap: lastPingSendNanos send-throttle is tested by bypassing it, never by triggering it

The lastPingSendNanos field exists to prevent rapid-fire PING storms after write failure. This test proves the multi-tick failure path, but explicitly rewinds lastPingSendNanos to longAgo to bypass the throttle on the second tick (the comment at line 231-233 even acknowledges this).

No test ever asserts the converse: that a second call to maybeSendPing within the same pingIntervalNanos window (without rewinding lastPingSendNanos) does NOT send a second PING.

If lastPingSendNanos were accidentally removed from the idleAnchor computation at line 264 (long idleAnchor = Math.max(effectiveLastReadActivityNanos(ctx), lastPingSendNanos)), the PING-storm defense would disappear and all tests would still pass.

Consider adding a writeFailure_throttlesRapidResend test:

  1. First tick: write fails, consecutiveFailures = 1, lastPingSendNanos = now
  2. Rewind lastReadActivityNanos and child attr to stale, but do not rewind lastPingSendNanos
  3. Second tick: assert pingsSent and consecutiveFailures did not advance (throttle held)

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

childRead.set(longAgo);
invokeMaybeSendPing(handler, ctx);
channel.runPendingTasks();

Expand All @@ -247,10 +257,17 @@ public void ackTimeout_incrementsConsecutiveFailuresAndClosesAtThreshold() throw
ChannelHandlerContext ctx = channel.pipeline().context(handler);

long pastTimeout = System.nanoTime() - TimeUnit.SECONDS.toNanos(5);
long evenOlder = pastTimeout - TimeUnit.SECONDS.toNanos(1);

// First tick: outstanding PING has aged past timeout -> failures=1, channel still open.
// Also rewind lastReadActivityNanos AND the child-stream-read-activity attribute to
// BEFORE the outstanding PING so the new "activity after outstanding PING" early-return
// (which would otherwise clear the outstanding state) does not fire.
setField(handler, "pingsSent", 1);
setField(handler, "pingOutstandingSinceNanos", pastTimeout);
setField(handler, "lastReadActivityNanos", evenOlder);
AtomicLong childRead = channel.attr(Http2PingHandler.LAST_CHILD_STREAM_READ_ACTIVITY_NANOS).get();
childRead.set(evenOlder);
invokeMaybeSendPing(handler, ctx);
channel.runPendingTasks();

Expand All @@ -263,6 +280,8 @@ public void ackTimeout_incrementsConsecutiveFailuresAndClosesAtThreshold() throw
// -> failures=threshold -> task cancelled, channel closed.
setField(handler, "pingsSent", 2);
setField(handler, "pingOutstandingSinceNanos", pastTimeout);
setField(handler, "lastReadActivityNanos", evenOlder);
childRead.set(evenOlder);
invokeMaybeSendPing(handler, ctx);
channel.runPendingTasks();

Expand All @@ -274,6 +293,100 @@ public void ackTimeout_incrementsConsecutiveFailuresAndClosesAtThreshold() throw
channel.finishAndReleaseAll();
}

@Test(groups = "unit")
public void childStreamReadActivity_suppressesIdlePing() throws Exception {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation — Test Gap: New tests bypass Http2PingCloseRewrapHandler.channelReadComplete entirely

Both new tests (childStreamReadActivity_suppressesIdlePing and childStreamReadActivityAfterOutstandingPing_resetsFailureState) set the LAST_CHILD_STREAM_READ_ACTIVITY_NANOS attribute directly via AtomicLong.set(). They never exercise the actual production code path in Http2PingCloseRewrapHandler.channelReadComplete() — the channelReadCompleteparent.attr(...).get().accumulateAndGet(System.nanoTime(), Math::max) path.

This means:

  • If channelReadComplete used the wrong attribute key, set(0) instead of accumulateAndGet(now, max), or accidentally omitted the super.channelReadComplete(ctx) call, all tests would still pass.
  • The monotonic guarantee (Math::max) for concurrent child streams is never verified.
  • Combined with the doOnConnected wiring issue above, the entire child-stream-to-parent signaling path has zero automated verification.

Consider adding a test that constructs a parent EmbeddedChannel (with Http2PingHandler) and a child EmbeddedChannel (with Http2PingCloseRewrapHandler.INSTANCE), fires channelReadComplete on the child, and asserts the parent's LAST_CHILD_STREAM_READ_ACTIVITY_NANOS attribute advanced.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

// Regression test for the v4.81.0-beta.1 fix: with parent-pipeline reads
// stale (would breach the idle threshold) but child-stream reads recent
// (via the LAST_CHILD_STREAM_READ_ACTIVITY_NANOS attribute updated by
// Http2PingCloseRewrapHandler.channelReadComplete on each H2 child stream),
// the handler must NOT send a PING.
//
// Without the fix (Http2PingHandler reading only lastReadActivityNanos), a
// saturated connection serving thousands of QPS via H2 child streams would
// still look "idle" to the handler -- because HEADERS/DATA frames are
// demuxed by Http2MultiplexHandler and never surface on the parent's
// channelRead -- and PINGs would fire on every interval, adding measurable
// latency overhead under load (the symptom that motivated this PR).
Http2PingHandler handler = new Http2PingHandler(1, 1, 3);
EmbeddedChannel channel = new EmbeddedChannel(handler);
ChannelHandlerContext ctx = channel.pipeline().context(handler);

// Parent-pipeline read-timestamp is stale (60s ago).
long longAgo = System.nanoTime() - TimeUnit.SECONDS.toNanos(60);
setField(handler, "lastReadActivityNanos", longAgo);

// Child-stream read-activity attribute is fresh: simulate Http2PingCloseRewrapHandler
// having just stamped the parent attribute after demuxing an inbound HEADERS/DATA batch.
AtomicLong childReadActivity = channel.attr(Http2PingHandler.LAST_CHILD_STREAM_READ_ACTIVITY_NANOS).get();
assertThat(childReadActivity).as("handlerAdded should have seeded the child-read-activity attribute").isNotNull();
childReadActivity.set(System.nanoTime());

invokeMaybeSendPing(handler, ctx);
channel.runPendingTasks();

// PING must NOT have fired: effectiveLastReadActivityNanos = max(stale, fresh) = fresh.
assertThat((int) getField(handler, "pingsSent"))
.as("recent child-stream reads must suppress idle-PING")
.isZero();
assertThat((long) getField(handler, "pingOutstandingSinceNanos")).isZero();
assertThat(channel.isOpen()).isTrue();

channel.finishAndReleaseAll();
}

@Test(groups = "unit")
public void childStreamReadActivityAfterOutstandingPing_resetsFailureState() throws Exception {
// Regression test for the v4.81.0-beta.1 fix: when a PING is outstanding
// (pingOutstandingSinceNanos != 0) AND child-stream reads flow AFTER the
// PING was sent, the handler must clear the outstanding state and reset
// consecutiveFailures -- the connection is demonstrably healthy (H2 codec is
// still delivering application data), even though the PING ACK itself was
// never observed on the parent pipeline.
//
// Without the fix, a middlebox that drops PING-ACK frames (but lets
// application H2 frames through) would accumulate consecutiveFailures up to
// the threshold and close a healthy, actively-serving connection.
Http2PingHandler handler = new Http2PingHandler(1, 1, 3);
EmbeddedChannel channel = new EmbeddedChannel(handler);
ChannelHandlerContext ctx = channel.pipeline().context(handler);

// PING sent 5s ago is still outstanding; 2 prior consecutive failures recorded.
long pingSentAt = System.nanoTime() - TimeUnit.SECONDS.toNanos(5);
setField(handler, "pingsSent", 7);
setField(handler, "pingOutstandingSinceNanos", pingSentAt);
setField(handler, "consecutiveFailures", 2);

// Parent-pipeline read-timestamp is even OLDER than the outstanding PING so
// it cannot itself trigger the early-return -- proving the reset is driven
// by the child-stream attribute specifically.
long parentLastRead = System.nanoTime() - TimeUnit.SECONDS.toNanos(10);
setField(handler, "lastReadActivityNanos", parentLastRead);

// Child-stream read-activity stamped AFTER the PING was sent (Http2PingCloseRewrapHandler
// saw inbound HEADERS/DATA on a child stream just now).
AtomicLong childReadActivity = channel.attr(Http2PingHandler.LAST_CHILD_STREAM_READ_ACTIVITY_NANOS).get();
assertThat(childReadActivity).isNotNull();
childReadActivity.set(System.nanoTime());

invokeMaybeSendPing(handler, ctx);
channel.runPendingTasks();

// Early-return cleared outstanding state and the failure counter; no new
// PING was sent; channel is still healthy and open.
assertThat((long) getField(handler, "pingOutstandingSinceNanos"))
.as("child-stream reads after outstanding PING must clear pingOutstandingSinceNanos")
.isZero();
assertThat((int) getField(handler, "consecutiveFailures"))
.as("child-stream reads after outstanding PING must reset consecutiveFailures")
.isZero();
assertThat((int) getField(handler, "pingsSent"))
.as("early-return must NOT send a new PING")
.isEqualTo(7);
assertThat(channel.isOpen()).isTrue();

channel.finishAndReleaseAll();
}

private static void invokeMaybeSendPing(Http2PingHandler handler, ChannelHandlerContext ctx) throws Exception {
Method m = Http2PingHandler.class.getDeclaredMethod("maybeSendPing", ChannelHandlerContext.class);
m.setAccessible(true);
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#### Bugs Fixed
* Fixed region name normalization for preferred and excluded regions — non-canonical inputs (e.g., `"westus3"`, `"WEST US 3"`) are now mapped to the canonical form. Also fixed a case-sensitive exclude-region check in PPCB reevaluate logic. - See [PR 49090](https://github.com/Azure/azure-sdk-for-java/pull/49090)
* Fixed `UnsupportedOperationException` when using `readManyByPartitionKeys` for empty pages. - See [PR 49311](https://github.com/Azure/azure-sdk-for-java/pull/49311)
* Fixed HTTP/2 PING keepalive handler (introduced in [PR 49095](https://github.com/Azure/azure-sdk-for-java/pull/49095)) so it observes child-stream HEADERS/DATA reads via `Http2PingCloseRewrapHandler.channelReadComplete`, preventing spurious PINGs (and spurious closes) on connections actively serving requests through `Http2MultiplexHandler`.

#### Other Changes
* Added HTTP/2 PING keepalive (default ON) for Gateway service endpoints to detect silently-broken connections. - See [PR 49095](https://github.com/Azure/azure-sdk-for-java/pull/49095)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.atomic.AtomicLong;

/**
* Per-request HTTP/2 child-stream handler that translates a parent-TCP-channel close
* driven by {@link Http2PingHandler} into a typed {@link Http2PingTimeoutChannelClosedException}.
Expand Down Expand Up @@ -40,6 +42,46 @@ final class Http2PingCloseRewrapHandler extends ChannelInboundHandlerAdapter {

private Http2PingCloseRewrapHandler() {}

/**
* Records inbound H2 stream reads on the parent channel so the PING handler
* can recognize a busy connection as non-idle and skip the PING send.
* <p>
* {@code channelReadComplete} fires at the end of each inbound read cycle on
* the child channel -- so this method stamps the parent attribute exactly once
* per batch of HEADERS/DATA frames demuxed by {@link io.netty.handler.codec.http2.Http2MultiplexHandler},
* not once per frame. Under normal load this is plenty of granularity to
* suppress unnecessary PINGs without becoming a hot-path allocator.
Comment thread
jeet1995 marked this conversation as resolved.
* <p>
* Edge cases:
* <ul>
* <li><b>Backpressure / autoRead off</b>: {@link io.netty.handler.codec.http2.Http2MultiplexHandler} may
* buffer frames in the child's inbound queue and defer dispatch. In that
* case {@code channelReadComplete} may fire later than the wire-arrival
* time, but it WILL fire when the read cycle ends. Worst case: one benign
Comment thread
jeet1995 marked this conversation as resolved.
* extra PING during the brief buffering window.</li>
* <li><b>Non-H2 channels</b>: this handler is only installed on H2 child
* streams (where {@code ch.parent() != null}), so the parent reference
* above is non-null in normal operation. Defensive null-check kept for
* safety if the install topology ever changes.</li>
* </ul>
* Uses {@code accumulateAndGet(now, Math::max)} so concurrent updates from
* sibling child streams remain monotonic (in practice child streams sharing a
* parent run on the same event loop, so contention is nil; the monotonic
* semantics document intent and future-proof against multi-event-loop
* scenarios).
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
Channel parent = ctx.channel().parent();
if (parent != null) {
AtomicLong holder = parent.attr(Http2PingHandler.LAST_CHILD_STREAM_READ_ACTIVITY_NANOS).get();
if (holder != null) {
holder.accumulateAndGet(System.nanoTime(), Math::max);
}
}
super.channelReadComplete(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel parent = ctx.channel().parent();
Expand Down
Loading
Loading