-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[WIP]: fix(cosmos): HTTP/2 PING handler must observe child-stream read activity, not just parent-pipeline frames #49398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ | |
| import java.lang.reflect.Method; | ||
| import java.util.concurrent.ScheduledFuture; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| import static org.assertj.core.api.Assertions.assertThat; | ||
|
|
||
|
|
@@ -25,7 +26,7 @@ | |
| * Covers state transitions that do not require advancing real time -- the handler's | ||
| * timeout / threshold logic reads {@code System.nanoTime()}, so the tests sidestep | ||
| * the clock by pre-setting {@code pingOutstandingSinceNanos} (or | ||
| * {@code lastActivityNanos}) via reflection: | ||
| * {@code lastReadActivityNanos}) via reflection: | ||
| * <ul> | ||
| * <li>PING ACK with matching payload resets the failure counter (RFC 9113 §6.7 payload echo).</li> | ||
| * <li>PING ACK with mismatched payload is ignored (late ACK after timeout cannot mask degradation).</li> | ||
|
|
@@ -209,10 +210,14 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) | |
|
|
||
| ChannelHandlerContext ctx = channel.pipeline().context(handler); | ||
| long longAgo = System.nanoTime() - TimeUnit.SECONDS.toNanos(60); | ||
| AtomicLong childRead = channel.attr(Http2PingHandler.LAST_CHILD_STREAM_READ_ACTIVITY_NANOS).get(); | ||
|
|
||
| // First tick: force idle -> PING send attempt -> write fails. | ||
| // Expect consecutiveFailures=1, channel still open, task still scheduled. | ||
| setField(handler, "lastActivityNanos", longAgo); | ||
| // Rewind both lastReadActivityNanos AND the child-stream-read-activity attribute | ||
| // (seeded in handlerAdded to now()) so effectiveLastReadActivityNanos is stale. | ||
| setField(handler, "lastReadActivityNanos", longAgo); | ||
| childRead.set(longAgo); | ||
| invokeMaybeSendPing(handler, ctx); | ||
| channel.runPendingTasks(); // run the write + listener on the embedded event loop | ||
|
|
||
|
|
@@ -222,7 +227,12 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) | |
| assertThat((Object) getField(handler, "pingTask")).isNotNull(); | ||
|
|
||
| // Second tick: write fails again -> threshold reached -> task cancelled, channel closed. | ||
| setField(handler, "lastActivityNanos", longAgo); | ||
| // Also rewind lastPingSendNanos so the send-throttle (added with the child-stream | ||
| // activity fix to prevent rapid-fire PING storms after write failure) does not | ||
| // block the second send within the same pingInterval. | ||
| setField(handler, "lastReadActivityNanos", longAgo); | ||
| setField(handler, "lastPingSendNanos", longAgo); | ||
| childRead.set(longAgo); | ||
| invokeMaybeSendPing(handler, ctx); | ||
| channel.runPendingTasks(); | ||
|
|
||
|
|
@@ -247,10 +257,17 @@ public void ackTimeout_incrementsConsecutiveFailuresAndClosesAtThreshold() throw | |
| ChannelHandlerContext ctx = channel.pipeline().context(handler); | ||
|
|
||
| long pastTimeout = System.nanoTime() - TimeUnit.SECONDS.toNanos(5); | ||
| long evenOlder = pastTimeout - TimeUnit.SECONDS.toNanos(1); | ||
|
|
||
| // First tick: outstanding PING has aged past timeout -> failures=1, channel still open. | ||
| // Also rewind lastReadActivityNanos AND the child-stream-read-activity attribute to | ||
| // BEFORE the outstanding PING so the new "activity after outstanding PING" early-return | ||
| // (which would otherwise clear the outstanding state) does not fire. | ||
| setField(handler, "pingsSent", 1); | ||
| setField(handler, "pingOutstandingSinceNanos", pastTimeout); | ||
| setField(handler, "lastReadActivityNanos", evenOlder); | ||
| AtomicLong childRead = channel.attr(Http2PingHandler.LAST_CHILD_STREAM_READ_ACTIVITY_NANOS).get(); | ||
| childRead.set(evenOlder); | ||
| invokeMaybeSendPing(handler, ctx); | ||
| channel.runPendingTasks(); | ||
|
|
||
|
|
@@ -263,6 +280,8 @@ public void ackTimeout_incrementsConsecutiveFailuresAndClosesAtThreshold() throw | |
| // -> failures=threshold -> task cancelled, channel closed. | ||
| setField(handler, "pingsSent", 2); | ||
| setField(handler, "pingOutstandingSinceNanos", pastTimeout); | ||
| setField(handler, "lastReadActivityNanos", evenOlder); | ||
| childRead.set(evenOlder); | ||
| invokeMaybeSendPing(handler, ctx); | ||
| channel.runPendingTasks(); | ||
|
|
||
|
|
@@ -274,6 +293,100 @@ public void ackTimeout_incrementsConsecutiveFailuresAndClosesAtThreshold() throw | |
| channel.finishAndReleaseAll(); | ||
| } | ||
|
|
||
| @Test(groups = "unit") | ||
| public void childStreamReadActivity_suppressesIdlePing() throws Exception { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Recommendation — Test Gap: New tests bypass Both new tests ( This means:
Consider adding a test that constructs a parent |
||
| // Regression test for the v4.81.0-beta.1 fix: with parent-pipeline reads | ||
| // stale (would breach the idle threshold) but child-stream reads recent | ||
| // (via the LAST_CHILD_STREAM_READ_ACTIVITY_NANOS attribute updated by | ||
| // Http2PingCloseRewrapHandler.channelReadComplete on each H2 child stream), | ||
| // the handler must NOT send a PING. | ||
| // | ||
| // Without the fix (Http2PingHandler reading only lastReadActivityNanos), a | ||
| // saturated connection serving thousands of QPS via H2 child streams would | ||
| // still look "idle" to the handler -- because HEADERS/DATA frames are | ||
| // demuxed by Http2MultiplexHandler and never surface on the parent's | ||
| // channelRead -- and PINGs would fire on every interval, adding measurable | ||
| // latency overhead under load (the symptom that motivated this PR). | ||
| Http2PingHandler handler = new Http2PingHandler(1, 1, 3); | ||
| EmbeddedChannel channel = new EmbeddedChannel(handler); | ||
| ChannelHandlerContext ctx = channel.pipeline().context(handler); | ||
|
|
||
| // Parent-pipeline read-timestamp is stale (60s ago). | ||
| long longAgo = System.nanoTime() - TimeUnit.SECONDS.toNanos(60); | ||
| setField(handler, "lastReadActivityNanos", longAgo); | ||
|
|
||
| // Child-stream read-activity attribute is fresh: simulate Http2PingCloseRewrapHandler | ||
| // having just stamped the parent attribute after demuxing an inbound HEADERS/DATA batch. | ||
| AtomicLong childReadActivity = channel.attr(Http2PingHandler.LAST_CHILD_STREAM_READ_ACTIVITY_NANOS).get(); | ||
| assertThat(childReadActivity).as("handlerAdded should have seeded the child-read-activity attribute").isNotNull(); | ||
| childReadActivity.set(System.nanoTime()); | ||
|
|
||
| invokeMaybeSendPing(handler, ctx); | ||
| channel.runPendingTasks(); | ||
|
|
||
| // PING must NOT have fired: effectiveLastReadActivityNanos = max(stale, fresh) = fresh. | ||
| assertThat((int) getField(handler, "pingsSent")) | ||
| .as("recent child-stream reads must suppress idle-PING") | ||
| .isZero(); | ||
| assertThat((long) getField(handler, "pingOutstandingSinceNanos")).isZero(); | ||
| assertThat(channel.isOpen()).isTrue(); | ||
|
|
||
| channel.finishAndReleaseAll(); | ||
| } | ||
|
|
||
| @Test(groups = "unit") | ||
| public void childStreamReadActivityAfterOutstandingPing_resetsFailureState() throws Exception { | ||
| // Regression test for the v4.81.0-beta.1 fix: when a PING is outstanding | ||
| // (pingOutstandingSinceNanos != 0) AND child-stream reads flow AFTER the | ||
| // PING was sent, the handler must clear the outstanding state and reset | ||
| // consecutiveFailures -- the connection is demonstrably healthy (H2 codec is | ||
| // still delivering application data), even though the PING ACK itself was | ||
| // never observed on the parent pipeline. | ||
| // | ||
| // Without the fix, a middlebox that drops PING-ACK frames (but lets | ||
| // application H2 frames through) would accumulate consecutiveFailures up to | ||
| // the threshold and close a healthy, actively-serving connection. | ||
| Http2PingHandler handler = new Http2PingHandler(1, 1, 3); | ||
| EmbeddedChannel channel = new EmbeddedChannel(handler); | ||
| ChannelHandlerContext ctx = channel.pipeline().context(handler); | ||
|
|
||
| // PING sent 5s ago is still outstanding; 2 prior consecutive failures recorded. | ||
| long pingSentAt = System.nanoTime() - TimeUnit.SECONDS.toNanos(5); | ||
| setField(handler, "pingsSent", 7); | ||
| setField(handler, "pingOutstandingSinceNanos", pingSentAt); | ||
| setField(handler, "consecutiveFailures", 2); | ||
|
|
||
| // Parent-pipeline read-timestamp is even OLDER than the outstanding PING so | ||
| // it cannot itself trigger the early-return -- proving the reset is driven | ||
| // by the child-stream attribute specifically. | ||
| long parentLastRead = System.nanoTime() - TimeUnit.SECONDS.toNanos(10); | ||
| setField(handler, "lastReadActivityNanos", parentLastRead); | ||
|
|
||
| // Child-stream read-activity stamped AFTER the PING was sent (Http2PingCloseRewrapHandler | ||
| // saw inbound HEADERS/DATA on a child stream just now). | ||
| AtomicLong childReadActivity = channel.attr(Http2PingHandler.LAST_CHILD_STREAM_READ_ACTIVITY_NANOS).get(); | ||
| assertThat(childReadActivity).isNotNull(); | ||
| childReadActivity.set(System.nanoTime()); | ||
|
|
||
| invokeMaybeSendPing(handler, ctx); | ||
| channel.runPendingTasks(); | ||
|
|
||
| // Early-return cleared outstanding state and the failure counter; no new | ||
| // PING was sent; channel is still healthy and open. | ||
| assertThat((long) getField(handler, "pingOutstandingSinceNanos")) | ||
| .as("child-stream reads after outstanding PING must clear pingOutstandingSinceNanos") | ||
| .isZero(); | ||
| assertThat((int) getField(handler, "consecutiveFailures")) | ||
| .as("child-stream reads after outstanding PING must reset consecutiveFailures") | ||
| .isZero(); | ||
| assertThat((int) getField(handler, "pingsSent")) | ||
| .as("early-return must NOT send a new PING") | ||
| .isEqualTo(7); | ||
| assertThat(channel.isOpen()).isTrue(); | ||
|
|
||
| channel.finishAndReleaseAll(); | ||
| } | ||
|
|
||
| private static void invokeMaybeSendPing(Http2PingHandler handler, ChannelHandlerContext ctx) throws Exception { | ||
| Method m = Http2PingHandler.class.getDeclaredMethod("maybeSendPing", ChannelHandlerContext.class); | ||
| m.setAccessible(true); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Recommendation — Test Gap:
lastPingSendNanossend-throttle is tested by bypassing it, never by triggering itThe
lastPingSendNanosfield exists to prevent rapid-fire PING storms after write failure. This test proves the multi-tick failure path, but explicitly rewindslastPingSendNanostolongAgoto bypass the throttle on the second tick (the comment at line 231-233 even acknowledges this).No test ever asserts the converse: that a second call to
maybeSendPingwithin the samepingIntervalNanoswindow (without rewindinglastPingSendNanos) does NOT send a second PING.If
lastPingSendNanoswere accidentally removed from theidleAnchorcomputation at line 264 (long idleAnchor = Math.max(effectiveLastReadActivityNanos(ctx), lastPingSendNanos)), the PING-storm defense would disappear and all tests would still pass.Consider adding a
writeFailure_throttlesRapidResendtest:consecutiveFailures = 1,lastPingSendNanos = nowlastReadActivityNanosand child attr to stale, but do not rewindlastPingSendNanospingsSentandconsecutiveFailuresdid not advance (throttle held)