Skip to content

Commit 60261b0

Browse files
authored
chore: fix ProxyChannelStateMachine logging policy compliance (kroxylicious#3752)
* Fix ProxyChannelStateMachine logging policy compliance All log statements now include sessionId and virtualCluster as structured key-values, enabling operators to correlate log events to a specific session and cluster. Separate remoteHost/remotePort and clientHost/clientPort keys are replaced with combined address and clientAddress keys per policy. Assisted-by: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * ensure the mock handler behaves Signed-off-by: Sam Barker <sam@quadrocket.co.uk> # Conflicts: # kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/ProxyChannelStateMachineTest.java * Extract sessionId and virtualCluster log key constants Avoids duplicating the string literals across every log call in the class. Assisted-by: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Revert "Extract sessionId and virtualCluster log key constants" This reverts commit fe33af2. Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Extract log(Level) helper to PSCM to reduce per-call boilerplate Adds a private log(Level) method that pre-populates sessionId and virtualCluster on the LoggingEventBuilder before returning it to the call site, mirroring the pattern already used by FilterHandler. Assisted-by: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Use Supplier form of addKeyValue for lazily-evaluated address keys Avoids string construction when DEBUG/INFO logging is disabled. Assisted-by: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> * Extract local variable to avoid requireNonNull inside logging supplier Assisted-by: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Sam Barker <sam@quadrocket.co.uk> --------- Signed-off-by: Sam Barker <sam@quadrocket.co.uk>
1 parent 9079f82 commit 60261b0

2 files changed

Lines changed: 29 additions & 13 deletions

File tree

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/ProxyChannelStateMachine.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.apache.kafka.common.protocol.ApiKeys;
1818
import org.apache.kafka.common.protocol.Errors;
1919
import org.slf4j.Logger;
20+
import org.slf4j.event.Level;
21+
import org.slf4j.spi.LoggingEventBuilder;
2022

2123
import io.micrometer.core.instrument.Counter;
2224
import io.micrometer.core.instrument.Timer;
@@ -224,6 +226,8 @@ void forceState(ProxyChannelState state,
224226
KafkaSession kafkaSession,
225227
int transportAndBackendLatch) {
226228
LOGGER.atInfo()
229+
.addKeyValue("sessionId", kafkaSession.sessionId())
230+
.addKeyValue("virtualCluster", clusterName())
227231
.addKeyValue("state", state)
228232
.addKeyValue("frontendHandler", frontendHandler)
229233
.addKeyValue("backendHandler", backendHandler)
@@ -332,10 +336,8 @@ public void onServerWritable() {
332336
void onClientActive(KafkaProxyFrontendHandler frontendHandler) {
333337
if (STARTING_STATE.equals(this.state)) {
334338
this.frontendHandler = frontendHandler;
335-
LOGGER.atDebug()
336-
.addKeyValue("sessionId", kafkaSession.sessionId())
337-
.addKeyValue("remoteHost", Objects.requireNonNull(this.frontendHandler).remoteHost())
338-
.addKeyValue("remotePort", this.frontendHandler.remotePort())
339+
log(Level.DEBUG)
340+
.addKeyValue("address", () -> HostPort.asString(frontendHandler.remoteHost(), frontendHandler.remotePort()))
339341
.log("Allocated session ID for downstream connection");
340342
ProxyChannelState.ClientActive clientActive = STARTING_STATE.toClientActive();
341343
toClientActive(clientActive, frontendHandler);
@@ -387,7 +389,7 @@ void onServerActive() {
387389
*/
388390
void illegalState(String msg) {
389391
if (!(state instanceof Closed)) {
390-
LOGGER.atError()
392+
log(Level.ERROR)
391393
.addKeyValue("state", state)
392394
.addKeyValue("message", msg)
393395
.log("Unexpected event, closing channels with no client response");
@@ -496,7 +498,7 @@ void onClientIdle() {
496498
*/
497499
@SuppressWarnings("java:S5738")
498500
void onServerException(@Nullable Throwable cause) {
499-
LOGGER.atWarn()
501+
log(Level.WARN)
500502
.addKeyValue("error", cause != null ? cause.getMessage() : "")
501503
.setCause(LOGGER.isDebugEnabled() ? cause : null)
502504
.log(LOGGER.isDebugEnabled()
@@ -522,15 +524,15 @@ void onClientException(@Nullable Throwable cause) {
522524
: " Possible unexpected TLS handshake? When connecting via TLS from your client, make sure to enable TLS for the Kroxylicious gateway ("
523525
+ StableKroxyliciousLinkGenerator.INSTANCE.errorLink(StableKroxyliciousLinkGenerator.CLIENT_TLS)
524526
+ ").";
525-
LOGGER.atWarn()
527+
log(Level.WARN)
526528
.addKeyValue("maxFrameSizeBytes", e.getMaxFrameSizeBytes())
527529
.addKeyValue("receivedFrameSizeBytes", e.getReceivedFrameSizeBytes())
528530
.addKeyValue("hint", tlsHint)
529531
.log("Received over-sized frame from client, other possible causes are: an oversized Kafka frame, or something unexpected like an HTTP request");
530532
errorCodeEx = Errors.INVALID_REQUEST.exception();
531533
}
532534
else {
533-
LOGGER.atWarn()
535+
log(Level.WARN)
534536
.addKeyValue("error", cause != null ? cause.getMessage() : "")
535537
.setCause(LOGGER.isDebugEnabled() ? cause : null)
536538
.log(LOGGER.isDebugEnabled()
@@ -632,11 +634,10 @@ private void toConnecting(
632634
backendHandler = new KafkaProxyBackendHandler(this);
633635
Objects.requireNonNull(frontendHandler).inConnecting(connecting.remote(), backendHandler);
634636
proxyToServerConnectionCounter.increment();
635-
LOGGER.atDebug()
636-
.addKeyValue("sessionId", kafkaSession.sessionId())
637+
var frontend = Objects.requireNonNull(this.frontendHandler);
638+
log(Level.DEBUG)
637639
.addKeyValue("remote", connecting.remote())
638-
.addKeyValue("clientHost", Objects.requireNonNull(this.frontendHandler).remoteHost())
639-
.addKeyValue("clientPort", this.frontendHandler.remotePort())
640+
.addKeyValue("clientAddress", () -> HostPort.asString(frontend.remoteHost(), frontend.remotePort()))
640641
.log("Upstream connection established for client");
641642
}
642643

@@ -743,13 +744,26 @@ private void incrementAppropriateDisconnectsMetric(@Nullable DisconnectCause dis
743744
}
744745

745746
private void setState(ProxyChannelState state) {
746-
LOGGER.atTrace()
747+
log(Level.TRACE)
747748
.addKeyValue("stateMachine", this)
748749
.addKeyValue("targetState", state)
749750
.log("Transitioning to state");
750751
this.state = state;
751752
}
752753

754+
private LoggingEventBuilder log(Level level) {
755+
LoggingEventBuilder builder = switch (level) {
756+
case ERROR -> LOGGER.atError();
757+
case WARN -> LOGGER.atWarn();
758+
case INFO -> LOGGER.atInfo();
759+
case DEBUG -> LOGGER.atDebug();
760+
case TRACE -> LOGGER.atTrace();
761+
};
762+
return builder
763+
.addKeyValue("sessionId", kafkaSession.sessionId())
764+
.addKeyValue("virtualCluster", clusterName());
765+
}
766+
753767
private static boolean isMessageApiVersionsRequest(Object msg) {
754768
return msg instanceof DecodedRequestFrame
755769
&& ((DecodedRequestFrame<?>) msg).apiKey() == ApiKeys.API_VERSIONS;

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/ProxyChannelStateMachineTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ void setUp() {
112112
when(endpointGateway.virtualCluster()).thenReturn(VIRTUAL_CLUSTER_MODEL);
113113
proxyChannelStateMachine = new ProxyChannelStateMachine(endpointBinding, new DefaultSubjectBuilder(List.of()), new KafkaSession(KafkaSessionState.ESTABLISHING));
114114
when(frontendHandler.channelId()).thenReturn(DefaultChannelId.newInstance());
115+
when(frontendHandler.remoteHost()).thenReturn("testhost.example.com");
116+
when(frontendHandler.remotePort()).thenReturn(9476);
115117
// Make the executor run tasks synchronously for tests
116118
when(frontendHandler.eventLoopExecutor()).thenReturn(Runnable::run);
117119
}

0 commit comments

Comments
 (0)