Skip to content

Commit 4aa0256

Browse files
dahyvuunrobobarioSamBarker
authored
fix(routing): aggregate backend writability for client backpressure (kroxylicious#4238)
* fix(routing): aggregate backend writability for client backpressure Signed-off-by: dahyvuun <dahyvuun@gmail.com> * chore: format ClientConnectionStateMachineTest Signed-off-by: dahyvuun <dahyvuun@gmail.com> * Fix isWritable() to track actual server channel writability serverReadsBlocked only tracked client-side congestion, so isWritable() never reflected the real state of the server channel, making the multi-backend backpressure guard a no-op. Add a dedicated serverChannelWritable field, set from onServerWritable()/onServerUnwritable(), and have isWritable() return that instead. Assisted-by: Claude Signed-off-by: dahyvuun <dahyvuun@gmail.com> * chore: remove duplicate stub and document serverChannelWritable field Signed-off-by: dahyvuun <dahyvuun@gmail.com> * chore: extract metadataRequest() out of assertThatThrownBy lambda Signed-off-by: dahyvuun <dahyvuun@gmail.com> * chore: trigger CI Signed-off-by: dahyvuun <dahyvuun@gmail.com> * fix: add missing semicolon in ClientConnectionStateMachineTest Signed-off-by: dahyvuun <dahyvuun@gmail.com> * fix: restore assertion chain in forwardToRouteShouldTransitionToClosedIfScsmCreationFails Signed-off-by: dahyvuun <dahyvuun@gmail.com> * fix: remove onClientRequest assertion merged from conflict resolution Signed-off-by: dahyvuun <dahyvuun@gmail.com> * docs(runtime): document backpressure contract on CCSM and SCSM The backpressure invariants were undocumented on both classes. Add the concrete rules to each class javadoc so the contract is explicit. Assisted-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> --------- Signed-off-by: dahyvuun <dahyvuun@gmail.com> Signed-off-by: Dahyun Woo <164325611+dahyvuun@users.noreply.github.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> Co-authored-by: Robert Young <robeyoun@redhat.com> Co-authored-by: Sam Barker <sam@quadrocket.co.uk>
1 parent 50f6b76 commit 4aa0256

4 files changed

Lines changed: 134 additions & 7 deletions

File tree

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/ClientConnectionStateMachine.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,15 @@
104104
* {@link ServerConnectionStateMachine}.</p>
105105
*
106106
* <p>
107-
* When either side of the proxy starts applying back pressure the proxy should propagate that fact to the other peer.
108-
* Thus, when the proxy is notified that a peer is applying back pressure it results in action on the channel with the opposite peer.
107+
* When either side of the proxy starts applying back pressure the proxy should propagate that fact to the other peer(s).
108+
* Thus, when the proxy is notified that any peer is applying back pressure it results in action on the channels with the opposite peer(s).
109+
* Concretely this means:
109110
* </p>
111+
* <ul>
112+
* <li>When any server channel becomes unwritable, client reads are paused (don't accept requests we can't forward).</li>
113+
* <li>Client reads resume only when all server channels are writable.</li>
114+
* <li>When the client channel becomes unwritable, reads are paused on all server channels (don't accept responses we can't deliver).</li>
115+
* </ul>
110116
*/
111117
@SuppressWarnings({ "java:S1133", "java:S1172" }) // S1172: scsm params on ServerConnectionStateMachine callbacks identify the caller for multi-backend routing
112118
public class ClientConnectionStateMachine {
@@ -368,12 +374,16 @@ void onServerUnwritable() {
368374
*/
369375
void onServerWritable() {
370376
if (clientReadsBlocked) {
371-
clientReadsBlocked = false;
372-
if (clientToProxyBackpressureTimer != null) {
373-
clientToProxyBackpressureTimer.stop(clientToProxyBackPressureMeter);
374-
clientToProxyBackpressureTimer = null;
377+
boolean allWritable = serverConnections.values().stream()
378+
.allMatch(ServerConnectionStateMachine::isWritable);
379+
if (allWritable) {
380+
clientReadsBlocked = false;
381+
if (clientToProxyBackpressureTimer != null) {
382+
clientToProxyBackpressureTimer.stop(clientToProxyBackPressureMeter);
383+
clientToProxyBackpressureTimer = null;
384+
}
385+
Objects.requireNonNull(frontendHandler).relieveBackpressure();
375386
}
376-
Objects.requireNonNull(frontendHandler).relieveBackpressure();
377387
}
378388
}
379389

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/ServerConnectionStateMachine.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@
6060
* concerns from the client session. The CCSM retains client-side state and delegates
6161
* server operations here.
6262
*
63+
* <p>This class participates in TCP backpressure in both directions. When either side of the proxy
64+
* starts applying back pressure the proxy should propagate that fact to the other peer.
65+
* Concretely this means:</p>
66+
* <ul>
67+
* <li>When the server channel becomes unwritable, client reads are paused (don't accept requests we can't forward).</li>
68+
* <li>When the client channel becomes unwritable, server reads are paused (don't accept responses we can't deliver).</li>
69+
* </ul>
70+
*
6371
* <pre>
6472
* Connecting ──→ Active ────────────→ Closed
6573
* │ │ │
@@ -89,6 +97,13 @@ int serverMessagesInFlightCount() {
8997
@VisibleForTesting
9098
boolean serverReadsBlocked;
9199

100+
/**
101+
* Tracks whether the server channel is writable.
102+
* When false, client reads are paused to apply backpressure.
103+
*/
104+
@VisibleForTesting
105+
boolean serverChannelWritable = true;
106+
92107
@VisibleForTesting
93108
@Nullable
94109
Timer.Sample serverBackpressureTimer;
@@ -394,10 +409,12 @@ void serverReadComplete() {
394409
}
395410

396411
void onServerUnwritable() {
412+
serverChannelWritable = false;
397413
ccsm.onServerUnwritable();
398414
}
399415

400416
void onServerWritable() {
417+
serverChannelWritable = true;
401418
ccsm.onServerWritable();
402419
}
403420

@@ -444,6 +461,10 @@ void relieveBackpressure() {
444461
}
445462
}
446463

464+
boolean isWritable() {
465+
return serverChannelWritable;
466+
}
467+
447468
void close() {
448469
if (!(state instanceof ServerConnectionState.Closed)) {
449470
toClosed();
@@ -491,6 +512,7 @@ public String toString() {
491512
return "ServerConnectionStateMachine{" +
492513
"state=" + state +
493514
", serverReadsBlocked=" + serverReadsBlocked +
515+
", serverChannelWritable=" + serverChannelWritable +
494516
", serverMessagesInFlightCount=" + serverMessagesInFlightCount +
495517
'}';
496518
}

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/ClientConnectionStateMachineTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import static org.mockito.ArgumentMatchers.notNull;
7777
import static org.mockito.Mockito.doAnswer;
7878
import static org.mockito.Mockito.doNothing;
79+
import static org.mockito.Mockito.lenient;
7980
import static org.mockito.Mockito.mock;
8081
import static org.mockito.Mockito.never;
8182
import static org.mockito.Mockito.times;
@@ -127,6 +128,7 @@ void setUp() {
127128
when(frontendHandler.clientChannel()).thenReturn(mock(Channel.class));
128129
// Make the executor run tasks synchronously for tests
129130
when(frontendHandler.eventLoopExecutor()).thenReturn(Runnable::run);
131+
lenient().when(serverConnectionStateMachine.isWritable()).thenReturn(true);
130132
}
131133

132134
@AfterEach
@@ -1193,6 +1195,56 @@ void onClientWritableShouldRelieveBackpressureOnAllServerConnections() {
11931195
verify(scsm2).relieveBackpressure();
11941196
}
11951197

1198+
@Test
1199+
void shouldNotUnblockClientWhenOnlyOneBackendBecomesWritable() {
1200+
// Given
1201+
var scsm1 = mock(ServerConnectionStateMachine.class);
1202+
var scsm2 = mock(ServerConnectionStateMachine.class);
1203+
var addr1 = new HostPort("host1", 9092);
1204+
var addr2 = new HostPort("host2", 9092);
1205+
clientConnectionStateMachine.forceState(
1206+
new ClientConnectionState.Forwarding(),
1207+
frontendHandler,
1208+
Map.of(addr1, scsm1, addr2, scsm2),
1209+
TEST_KAFKA_SESSION,
1210+
true);
1211+
clientConnectionStateMachine.clientReadsBlocked = true;
1212+
lenient().when(scsm1.isWritable()).thenReturn(true);
1213+
when(scsm2.isWritable()).thenReturn(false);
1214+
1215+
// When
1216+
clientConnectionStateMachine.onServerWritable();
1217+
1218+
// Then
1219+
assertThat(clientConnectionStateMachine.clientReadsBlocked).isTrue();
1220+
verify(frontendHandler, never()).relieveBackpressure();
1221+
}
1222+
1223+
@Test
1224+
void shouldUnblockClientWhenAllBackendsWritable() {
1225+
// Given
1226+
var scsm1 = mock(ServerConnectionStateMachine.class);
1227+
var scsm2 = mock(ServerConnectionStateMachine.class);
1228+
var addr1 = new HostPort("host1", 9092);
1229+
var addr2 = new HostPort("host2", 9092);
1230+
clientConnectionStateMachine.forceState(
1231+
new ClientConnectionState.Forwarding(),
1232+
frontendHandler,
1233+
Map.of(addr1, scsm1, addr2, scsm2),
1234+
TEST_KAFKA_SESSION,
1235+
true);
1236+
clientConnectionStateMachine.clientReadsBlocked = true;
1237+
lenient().when(scsm1.isWritable()).thenReturn(true);
1238+
lenient().when(scsm2.isWritable()).thenReturn(true);
1239+
1240+
// When
1241+
clientConnectionStateMachine.onServerWritable();
1242+
1243+
// Then
1244+
assertThat(clientConnectionStateMachine.clientReadsBlocked).isFalse();
1245+
verify(frontendHandler).relieveBackpressure();
1246+
}
1247+
11961248
private int getVirtualNodeClientToProxyActiveConnections() {
11971249
return io.kroxylicious.proxy.internal.util.Metrics.clientToProxyConnectionCounter(VIRTUAL_CLUSTER_NODE).get();
11981250
}

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/ServerConnectionStateMachineTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,7 @@ void onServerUnwritableShouldDelegateToCcsm() {
746746
scsm.onServerUnwritable();
747747

748748
verify(ccsm).onServerUnwritable();
749+
assertThat(scsm.isWritable()).isFalse();
749750
}
750751

751752
@Test
@@ -757,6 +758,47 @@ void onServerWritableShouldDelegateToCcsm() {
757758
scsm.onServerWritable();
758759

759760
verify(ccsm).onServerWritable();
761+
assertThat(scsm.isWritable()).isTrue();
762+
}
763+
764+
// === isWritable() / serverChannelWritable tests ===
765+
766+
@Test
767+
void isWritableShouldDefaultToTrue() {
768+
var scsm = createScsm();
769+
770+
assertThat(scsm.isWritable()).isTrue();
771+
}
772+
773+
@Test
774+
void onServerUnwritableShouldSetWritableFalse() {
775+
var scsm = createScsm();
776+
777+
scsm.onServerUnwritable();
778+
779+
assertThat(scsm.isWritable()).isFalse();
780+
assertThat(scsm.serverChannelWritable).isFalse();
781+
}
782+
783+
@Test
784+
void onServerWritableShouldSetWritableTrue() {
785+
var scsm = createScsm();
786+
scsm.onServerUnwritable();
787+
788+
scsm.onServerWritable();
789+
790+
assertThat(scsm.isWritable()).isTrue();
791+
assertThat(scsm.serverChannelWritable).isTrue();
792+
}
793+
794+
@Test
795+
void isWritableShouldBeIndependentOfServerReadsBlocked() {
796+
var scsm = createScsm();
797+
798+
scsm.applyBackpressure();
799+
800+
assertThat(scsm.serverReadsBlocked).isTrue();
801+
assertThat(scsm.isWritable()).isTrue();
760802
}
761803

762804
// === In-flight count tests ===
@@ -812,6 +854,7 @@ void toStringShouldContainRelevantFields() {
812854
assertThat(scsm.toString())
813855
.contains("state=")
814856
.contains("serverReadsBlocked=")
857+
.contains("serverChannelWritable=")
815858
.contains("serverMessagesInFlightCount=");
816859
}
817860

0 commit comments

Comments
 (0)