Skip to content

Commit a6660e7

Browse files
authored
fix(runtime): ensure TransportSubjectBuilder callbacks run on Netty event loop (kroxylicious#3748)
Discovered during the investigation of kroxylicious#3745 When a TransportSubjectBuilder completes asynchronously on a non-Netty thread, the completion callback may update ClientSubjectManager.subject from a foreign thread, creating a race condition where the Netty event loop thread reads a stale value. The original code used whenComplete() without an executor, allowing the callback to run on whichever thread completed the CompletableFuture. Test plugin MyTransportSubjectBuilderService created unmanaged threads that completed futures after delays, triggering cross-thread writes. The race became observable after commit 25aa9d9 introduced shared Kafka clusters across test parameters, tightening timing windows. Pass the Netty event loop executor to ClientSubjectManager.subjectFromTransport() and use whenCompleteAsync(..., eventLoopExecutor) to ensure callbacks always execute on the channel's event loop thread. This provides: - Thread affinity: ClientSubjectManager state only modified on event loop - Happens-before guarantee: executor submission creates memory barrier - Netty best practices: don't mutate channel state from foreign threads Test code updated to use CompletableFuture.delayedExecutor() instead of unmanaged threads for cleaner async simulation. Assisted-by: Claude Sonnet 4.5 <noreply@anthropic.com> Signed-off-by: Keith Wall <kwall@apache.org>
1 parent 45afe24 commit a6660e7

9 files changed

Lines changed: 145 additions & 25 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Format `<github issue/pr number>: <short description>`.
77

88
## SNAPSHOT
99

10+
* [#3745](https://github.com/kroxylicious/kroxylicious/issues/3745): fix(runtime): ensure async TransportSubjectBuilder callbacks execute on Netty event loop to prevent race conditions
1011
* [#3620](https://github.com/kroxylicious/kroxylicious/issues/3620): Removed Deprecated clientSaslAuthenticationSuccess from FilterContext
1112
* [#3624](https://github.com/kroxylicious/kroxylicious/pull/3624): feat(operator): set Kubernetes client User-Agent to `kroxylicious-operator/<version>` for API server audit log identification
1213
* [#3565](https://github.com/kroxylicious/kroxylicious/pull/3514): build(deps): bump kubernetes-client.version from 7.5.2 to 7.6.1

kroxylicious-integration-tests/src/test/java/io/kroxylicious/it/MyTransportSubjectBuilderService.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.Map;
1010
import java.util.concurrent.CompletableFuture;
1111
import java.util.concurrent.CompletionStage;
12+
import java.util.concurrent.TimeUnit;
1213

1314
import javax.security.auth.x500.X500Principal;
1415

@@ -56,22 +57,15 @@ CompletionStage<Subject> delayed(Subject subject) {
5657
}
5758
}
5859
else {
59-
var fut = new CompletableFuture<Subject>();
60-
new Thread(() -> {
61-
try {
62-
Thread.sleep(delayMs);
63-
}
64-
catch (InterruptedException e) {
65-
throw new RuntimeException(e);
66-
}
67-
if (completeSuccessfully) {
68-
fut.complete(subject);
69-
}
70-
else {
71-
fut.completeExceptionally(new RuntimeException("Oops"));
72-
}
73-
}).start();
74-
return fut;
60+
if (completeSuccessfully) {
61+
return CompletableFuture.supplyAsync(() -> subject,
62+
CompletableFuture.delayedExecutor(delayMs, TimeUnit.MILLISECONDS));
63+
}
64+
else {
65+
return CompletableFuture.supplyAsync(() -> {
66+
throw new RuntimeException("Oops");
67+
}, CompletableFuture.delayedExecutor(delayMs, TimeUnit.MILLISECONDS));
68+
}
7569
}
7670
}
7771
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/ClientSubjectManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.security.cert.X509Certificate;
1111
import java.util.Objects;
1212
import java.util.Optional;
13+
import java.util.concurrent.Executor;
1314

1415
import javax.net.ssl.SSLPeerUnverifiedException;
1516
import javax.net.ssl.SSLSession;
@@ -86,10 +87,11 @@ public class ClientSubjectManager implements
8687

8788
public void subjectFromTransport(@Nullable SSLSession session,
8889
TransportSubjectBuilder transportSubjectBuilder,
90+
Executor eventLoopExecutor,
8991
Runnable whenDoneCallback) {
9092
this.clientCertificate = peerTlsCertificate(session);
9193
this.proxyCertificate = localTlsCertificate(session);
92-
transportSubjectBuilder.buildTransportSubject(this).whenComplete((newSubject, error) -> {
94+
transportSubjectBuilder.buildTransportSubject(this).whenCompleteAsync((newSubject, error) -> {
9395
if (error == null) {
9496
this.subject = newSubject;
9597
}
@@ -101,7 +103,7 @@ public void subjectFromTransport(@Nullable SSLSession session,
101103
}
102104
this.mechanismName = null;
103105
whenDoneCallback.run();
104-
});
106+
}, eventLoopExecutor);
105107
}
106108

107109
void clientSaslAuthenticationSuccess(

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/KafkaProxyFrontendHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.List;
1313
import java.util.Objects;
1414
import java.util.Optional;
15+
import java.util.concurrent.Executor;
1516
import java.util.concurrent.TimeUnit;
1617

1718
import javax.net.ssl.SSLEngine;
@@ -558,6 +559,10 @@ void unblockClient() {
558559
proxyChannelStateMachine.onClientWritable();
559560
}
560561

562+
Executor eventLoopExecutor() {
563+
return Objects.requireNonNull(clientCtx().executor(), "executor must not be null");
564+
}
565+
561566
private ChannelHandlerContext clientCtx() {
562567
return Objects.requireNonNull(this.clientCtx, "clientCtx was null while in state " + this.proxyChannelStateMachine.currentState());
563568
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/ProxyChannelStateMachine.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,8 @@ public void clientSaslAuthenticationFailure() {
581581
}
582582

583583
public void onClientTlsHandshakeSuccess(SSLSession sslSession) {
584-
this.clientSubjectManager.subjectFromTransport(sslSession, transportSubjectBuilder, this::onTransportSubjectBuilt);
584+
this.clientSubjectManager.subjectFromTransport(sslSession, transportSubjectBuilder,
585+
Objects.requireNonNull(frontendHandler).eventLoopExecutor(), this::onTransportSubjectBuilt);
585586
}
586587

587588
@SuppressWarnings("java:S5738")
@@ -596,7 +597,8 @@ private void toClientActive(
596597
// these can happen in either order
597598
this.progressionLatch = 2;
598599
if (!this.isTlsListener()) {
599-
this.clientSubjectManager.subjectFromTransport(null, this.transportSubjectBuilder, this::onTransportSubjectBuilt);
600+
this.clientSubjectManager.subjectFromTransport(null, this.transportSubjectBuilder,
601+
frontendHandler.eventLoopExecutor(), this::onTransportSubjectBuilt);
600602
}
601603
frontendHandler.inClientActive();
602604

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/ClientSubjectManagerTest.java

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.security.cert.X509Certificate;
1111
import java.util.List;
1212
import java.util.concurrent.CompletableFuture;
13+
import java.util.concurrent.atomic.AtomicBoolean;
1314

1415
import javax.net.ssl.SSLPeerUnverifiedException;
1516
import javax.net.ssl.SSLSession;
@@ -108,7 +109,7 @@ void initialState() {
108109
void transitionInitialToAuthorized() {
109110
// Given
110111
ClientSubjectManager impl = new ClientSubjectManager();
111-
impl.subjectFromTransport(null, context -> CompletableFuture.completedStage(Subject.anonymous()), () -> {
112+
impl.subjectFromTransport(null, context -> CompletableFuture.completedStage(Subject.anonymous()), Runnable::run, () -> {
112113
});
113114
// When
114115
impl.clientSaslAuthenticationSuccess("FOO", new Subject(new User("bob")));
@@ -123,7 +124,7 @@ void transitionInitialToAuthorized() {
123124
void transitionInitialToFailed() {
124125
// Given
125126
ClientSubjectManager impl = new ClientSubjectManager();
126-
impl.subjectFromTransport(null, context -> CompletableFuture.completedStage(Subject.anonymous()), () -> {
127+
impl.subjectFromTransport(null, context -> CompletableFuture.completedStage(Subject.anonymous()), Runnable::run, () -> {
127128
});
128129
// When
129130
impl.clientSaslAuthenticationFailure();
@@ -135,7 +136,7 @@ void transitionInitialToFailed() {
135136
void transitionAuthorizedToAuthorized() {
136137
// Given
137138
ClientSubjectManager impl = new ClientSubjectManager();
138-
impl.subjectFromTransport(null, context -> CompletableFuture.completedStage(Subject.anonymous()), () -> {
139+
impl.subjectFromTransport(null, context -> CompletableFuture.completedStage(Subject.anonymous()), Runnable::run, () -> {
139140
});
140141
impl.clientSaslAuthenticationSuccess("FOO", new Subject(new User("bob")));
141142
// When
@@ -151,7 +152,7 @@ void transitionAuthorizedToAuthorized() {
151152
void transitionAuthorizedToFailed() {
152153
// Given
153154
ClientSubjectManager impl = new ClientSubjectManager();
154-
impl.subjectFromTransport(null, context -> CompletableFuture.completedStage(Subject.anonymous()), () -> {
155+
impl.subjectFromTransport(null, context -> CompletableFuture.completedStage(Subject.anonymous()), Runnable::run, () -> {
155156
});
156157
impl.clientSaslAuthenticationSuccess("FOO", new Subject(new User("bob")));
157158
// When
@@ -164,7 +165,7 @@ void transitionAuthorizedToFailed() {
164165
void transitionFailedToAuthorized() {
165166
// Given
166167
ClientSubjectManager impl = new ClientSubjectManager();
167-
impl.subjectFromTransport(null, context -> CompletableFuture.completedStage(Subject.anonymous()), () -> {
168+
impl.subjectFromTransport(null, context -> CompletableFuture.completedStage(Subject.anonymous()), Runnable::run, () -> {
168169
});
169170
impl.clientSaslAuthenticationFailure();
170171

@@ -177,4 +178,71 @@ void transitionFailedToAuthorized() {
177178
});
178179
}
179180

181+
@Test
182+
void subjectFromTransportUsesProvidedExecutor() {
183+
// Given
184+
ClientSubjectManager impl = new ClientSubjectManager();
185+
AtomicBoolean executorUsed = new AtomicBoolean(false);
186+
AtomicBoolean callbackRan = new AtomicBoolean(false);
187+
188+
// When
189+
impl.subjectFromTransport(
190+
null,
191+
context -> CompletableFuture.completedStage(Subject.anonymous()),
192+
command -> {
193+
executorUsed.set(true);
194+
command.run();
195+
},
196+
() -> callbackRan.set(true));
197+
198+
// Then
199+
assertThat(executorUsed).isTrue();
200+
assertThat(callbackRan).isTrue();
201+
}
202+
203+
@Test
204+
void subjectFromTransportHandlesAsyncCompletion() {
205+
// Given
206+
ClientSubjectManager impl = new ClientSubjectManager();
207+
CompletableFuture<Subject> asyncFuture = new CompletableFuture<>();
208+
AtomicBoolean callbackRan = new AtomicBoolean(false);
209+
Subject expectedSubject = new Subject(new User("alice"));
210+
211+
// When
212+
impl.subjectFromTransport(
213+
null,
214+
context -> asyncFuture,
215+
Runnable::run,
216+
() -> callbackRan.set(true));
217+
218+
assertThat(callbackRan).isFalse(); // Not yet complete
219+
220+
asyncFuture.complete(expectedSubject);
221+
222+
// Then
223+
assertThat(callbackRan).isTrue();
224+
assertThat(impl.authenticatedSubject()).isEqualTo(expectedSubject);
225+
}
226+
227+
@Test
228+
void subjectFromTransportHandlesAsyncException() {
229+
// Given
230+
ClientSubjectManager impl = new ClientSubjectManager();
231+
CompletableFuture<Subject> asyncFuture = new CompletableFuture<>();
232+
AtomicBoolean callbackRan = new AtomicBoolean(false);
233+
234+
// When
235+
impl.subjectFromTransport(
236+
null,
237+
context -> asyncFuture,
238+
Runnable::run,
239+
() -> callbackRan.set(true));
240+
241+
asyncFuture.completeExceptionally(new RuntimeException("Transport auth failed"));
242+
243+
// Then
244+
assertThat(callbackRan).isTrue();
245+
assertThat(impl.authenticatedSubject()).isEqualTo(Subject.anonymous());
246+
}
247+
180248
}

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/KafkaProxyFrontendHandlerMockCollaboratorsTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
2929
import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
3030
import io.netty.handler.timeout.IdleStateHandler;
31+
import io.netty.util.concurrent.EventExecutor;
3132

3233
import io.kroxylicious.proxy.authentication.Subject;
3334
import io.kroxylicious.proxy.authentication.TransportSubjectBuilder;
@@ -46,6 +47,7 @@
4647
import static org.assertj.core.api.Assertions.assertThat;
4748
import static org.mockito.ArgumentMatchers.any;
4849
import static org.mockito.ArgumentMatchers.eq;
50+
import static org.mockito.Mockito.doAnswer;
4951
import static org.mockito.Mockito.mock;
5052
import static org.mockito.Mockito.never;
5153
import static org.mockito.Mockito.times;
@@ -90,6 +92,9 @@ class KafkaProxyFrontendHandlerMockCollaboratorsTest {
9092
@Mock(strictness = Mock.Strictness.LENIENT)
9193
TransportSubjectBuilder subjectBuilder;
9294

95+
@Mock(strictness = Mock.Strictness.LENIENT)
96+
EventExecutor executor;
97+
9398
private KafkaProxyFrontendHandler handler;
9499

95100
@BeforeEach
@@ -99,6 +104,14 @@ void setUp() {
99104
when(endpointBinding.endpointGateway()).thenReturn(endpointGateway);
100105
when(proxyChannelStateMachine.endpointBinding()).thenReturn(endpointBinding);
101106
when(proxyChannelStateMachine.virtualCluster()).thenReturn(virtualCluster);
107+
// Make the executor run tasks synchronously
108+
doAnswer(invocation -> {
109+
Runnable runnable = invocation.getArgument(0);
110+
runnable.run();
111+
return null;
112+
}).when(executor).execute(any(Runnable.class));
113+
when(clientCtx.executor()).thenReturn(executor);
114+
102115
handler = new KafkaProxyFrontendHandler(
103116
pfr,
104117
filterChainFactory,
@@ -339,4 +352,16 @@ void shouldNotMarkSessionAuthenticatedWhenSessionTransportAuthenticatedIsAnonymo
339352
assertThat(pcsm.kafkaSession().currentState())
340353
.isEqualTo(KafkaSessionState.ESTABLISHING);
341354
}
355+
356+
@Test
357+
void eventLoopExecutorReturnsContextExecutor() throws Exception {
358+
// Given
359+
handler.channelActive(clientCtx);
360+
361+
// When
362+
var result = handler.eventLoopExecutor();
363+
364+
// Then
365+
assertThat(result).isSameAs(executor);
366+
}
342367
}

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/KafkaProxyFrontendHandlerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,7 @@ private ChannelHandlerContext mockChannelContext() {
588588
ChannelPipeline mockPipeline = mock(ChannelPipeline.class);
589589
doReturn(inboundChannel).when(mockChannelCtx).channel();
590590
doReturn(mockPipeline).when(mockChannelCtx).pipeline();
591+
doReturn(inboundChannel.eventLoop()).when(mockChannelCtx).executor();
591592
return mockChannelCtx;
592593
}
593594

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/ProxyChannelStateMachineTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88

99
import java.util.List;
1010
import java.util.Optional;
11+
import java.util.concurrent.atomic.AtomicBoolean;
1112
import java.util.stream.Stream;
1213

1314
import javax.net.ssl.SSLException;
15+
import javax.net.ssl.SSLSession;
1416

1517
import org.apache.kafka.common.errors.ApiException;
1618
import org.apache.kafka.common.errors.InvalidRequestException;
@@ -110,6 +112,8 @@ void setUp() {
110112
when(endpointGateway.virtualCluster()).thenReturn(VIRTUAL_CLUSTER_MODEL);
111113
proxyChannelStateMachine = new ProxyChannelStateMachine(endpointBinding, new DefaultSubjectBuilder(List.of()), new KafkaSession(KafkaSessionState.ESTABLISHING));
112114
when(frontendHandler.channelId()).thenReturn(DefaultChannelId.newInstance());
115+
// Make the executor run tasks synchronously for tests
116+
when(frontendHandler.eventLoopExecutor()).thenReturn(Runnable::run);
113117
}
114118

115119
@AfterEach
@@ -1015,6 +1019,24 @@ private int getVirtualNodeProxyToServerActiveConnections() {
10151019
return io.kroxylicious.proxy.internal.util.Metrics.proxyToServerConnectionCounter(VIRTUAL_CLUSTER_NODE).get();
10161020
}
10171021

1022+
@Test
1023+
void onClientTlsHandshakeSuccessPassesExecutorToSubjectManager() {
1024+
// Given
1025+
proxyChannelStateMachine.onClientActive(frontendHandler);
1026+
SSLSession sslSession = mock(SSLSession.class);
1027+
AtomicBoolean executorUsed = new AtomicBoolean(false);
1028+
when(frontendHandler.eventLoopExecutor()).thenReturn(command -> {
1029+
executorUsed.set(true);
1030+
command.run();
1031+
});
1032+
1033+
// When
1034+
proxyChannelStateMachine.onClientTlsHandshakeSuccess(sslSession);
1035+
1036+
// Then - verify the executor was actually used, proving the new parameter is passed through correctly
1037+
assertThat(executorUsed).isTrue();
1038+
}
1039+
10181040
@org.junit.jupiter.api.Nested
10191041
class DisconnectMetricsTest {
10201042

0 commit comments

Comments
 (0)