Skip to content

Commit e1a923d

Browse files
authored
binding-kafka events update (#1768)
1 parent b6a9f58 commit e1a923d

44 files changed

Lines changed: 2006 additions & 6 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,15 @@
1717

1818
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.API_VERSION_REJECTED;
1919
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.AUTHORIZATION_FAILED;
20+
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.BROKER_CONNECTION_FAILED;
2021
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.CLUSTER_AUTHORIZATION_FAILED;
22+
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.GROUP_AUTHORIZATION_FAILED;
23+
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.OFFSET_COMMIT_FAILED;
24+
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.PRODUCE_ERROR;
2125
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.SASL_AUTHENTICATION_FAILED;
2226
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.TOPIC_AUTHORIZATION_FAILED;
27+
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.TRANSACTIONAL_ID_AUTHORIZATION_FAILED;
28+
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.UNSUPPORTED_SASL_MECHANISM;
2329

2430
import java.nio.ByteBuffer;
2531
import java.time.Clock;
@@ -47,6 +53,12 @@ public class KafkaEventContext
4753
private final int clusterAuthorizationFailedEventId;
4854
private final int topicAuthorizationFailedEventId;
4955
private final int saslAuthenticationFailedEventId;
56+
private final int brokerConnectionFailedEventId;
57+
private final int produceErrorEventId;
58+
private final int groupAuthorizationFailedEventId;
59+
private final int transactionalIdAuthorizationFailedEventId;
60+
private final int unsupportedSaslMechanismEventId;
61+
private final int offsetCommitFailedEventId;
5062
private final MessageConsumer eventWriter;
5163
private final Clock clock;
5264

@@ -59,6 +71,13 @@ public KafkaEventContext(
5971
this.clusterAuthorizationFailedEventId = context.supplyEventId("binding.kafka.cluster.authorization.failed");
6072
this.topicAuthorizationFailedEventId = context.supplyEventId("binding.kafka.topic.authorization.failed");
6173
this.saslAuthenticationFailedEventId = context.supplyEventId("binding.kafka.sasl.authentication.failed");
74+
this.brokerConnectionFailedEventId = context.supplyEventId("binding.kafka.broker.connection.failed");
75+
this.produceErrorEventId = context.supplyEventId("binding.kafka.produce.error");
76+
this.groupAuthorizationFailedEventId = context.supplyEventId("binding.kafka.group.authorization.failed");
77+
this.transactionalIdAuthorizationFailedEventId =
78+
context.supplyEventId("binding.kafka.transactional.id.authorization.failed");
79+
this.unsupportedSaslMechanismEventId = context.supplyEventId("binding.kafka.unsupported.sasl.mechanism");
80+
this.offsetCommitFailedEventId = context.supplyEventId("binding.kafka.offset.commit.failed");
6281
this.eventWriter = context.supplyEventWriter();
6382
this.clock = context.clock();
6483
}
@@ -195,4 +214,158 @@ public void saslAuthenticationFailed(
195214
.build();
196215
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
197216
}
217+
218+
public void brokerConnectionFailed(
219+
long traceId,
220+
long bindingId,
221+
String host,
222+
int port)
223+
{
224+
KafkaEventExFW extension = kafkaEventExRW
225+
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
226+
.brokerConnectionFailed(e -> e
227+
.typeId(BROKER_CONNECTION_FAILED.value())
228+
.host(host)
229+
.port(port)
230+
)
231+
.build();
232+
EventFW event = eventRW
233+
.wrap(eventBuffer, 0, eventBuffer.capacity())
234+
.id(brokerConnectionFailedEventId)
235+
.timestamp(clock.millis())
236+
.traceId(traceId)
237+
.namespacedId(bindingId)
238+
.extension(extension.buffer(), extension.offset(), extension.limit())
239+
.build();
240+
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
241+
}
242+
243+
public void produceError(
244+
long traceId,
245+
long bindingId,
246+
int apiKey,
247+
int apiVersion,
248+
int errorCode,
249+
String topic)
250+
{
251+
KafkaEventExFW extension = kafkaEventExRW
252+
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
253+
.produceError(e -> e
254+
.typeId(PRODUCE_ERROR.value())
255+
.apiKey(apiKey)
256+
.apiVersion(apiVersion)
257+
.errorCode(errorCode)
258+
.topic(topic)
259+
)
260+
.build();
261+
EventFW event = eventRW
262+
.wrap(eventBuffer, 0, eventBuffer.capacity())
263+
.id(produceErrorEventId)
264+
.timestamp(clock.millis())
265+
.traceId(traceId)
266+
.namespacedId(bindingId)
267+
.extension(extension.buffer(), extension.offset(), extension.limit())
268+
.build();
269+
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
270+
}
271+
272+
public void groupAuthorizationFailed(
273+
long traceId,
274+
long bindingId,
275+
int apiKey,
276+
int apiVersion)
277+
{
278+
KafkaEventExFW extension = kafkaEventExRW
279+
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
280+
.groupAuthorizationFailed(e -> e
281+
.typeId(GROUP_AUTHORIZATION_FAILED.value())
282+
.apiKey(apiKey)
283+
.apiVersion(apiVersion)
284+
)
285+
.build();
286+
EventFW event = eventRW
287+
.wrap(eventBuffer, 0, eventBuffer.capacity())
288+
.id(groupAuthorizationFailedEventId)
289+
.timestamp(clock.millis())
290+
.traceId(traceId)
291+
.namespacedId(bindingId)
292+
.extension(extension.buffer(), extension.offset(), extension.limit())
293+
.build();
294+
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
295+
}
296+
297+
public void transactionalIdAuthorizationFailed(
298+
long traceId,
299+
long bindingId,
300+
int apiKey,
301+
int apiVersion)
302+
{
303+
KafkaEventExFW extension = kafkaEventExRW
304+
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
305+
.transactionalIdAuthorizationFailed(e -> e
306+
.typeId(TRANSACTIONAL_ID_AUTHORIZATION_FAILED.value())
307+
.apiKey(apiKey)
308+
.apiVersion(apiVersion)
309+
)
310+
.build();
311+
EventFW event = eventRW
312+
.wrap(eventBuffer, 0, eventBuffer.capacity())
313+
.id(transactionalIdAuthorizationFailedEventId)
314+
.timestamp(clock.millis())
315+
.traceId(traceId)
316+
.namespacedId(bindingId)
317+
.extension(extension.buffer(), extension.offset(), extension.limit())
318+
.build();
319+
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
320+
}
321+
322+
public void unsupportedSaslMechanism(
323+
long traceId,
324+
long bindingId,
325+
String mechanism)
326+
{
327+
KafkaEventExFW extension = kafkaEventExRW
328+
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
329+
.unsupportedSaslMechanism(e -> e
330+
.typeId(UNSUPPORTED_SASL_MECHANISM.value())
331+
.mechanism(mechanism)
332+
)
333+
.build();
334+
EventFW event = eventRW
335+
.wrap(eventBuffer, 0, eventBuffer.capacity())
336+
.id(unsupportedSaslMechanismEventId)
337+
.timestamp(clock.millis())
338+
.traceId(traceId)
339+
.namespacedId(bindingId)
340+
.extension(extension.buffer(), extension.offset(), extension.limit())
341+
.build();
342+
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
343+
}
344+
345+
public void offsetCommitFailed(
346+
long traceId,
347+
long bindingId,
348+
int apiKey,
349+
int apiVersion,
350+
int errorCode)
351+
{
352+
KafkaEventExFW extension = kafkaEventExRW
353+
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
354+
.offsetCommitFailed(e -> e
355+
.typeId(OFFSET_COMMIT_FAILED.value())
356+
.apiKey(apiKey)
357+
.apiVersion(apiVersion)
358+
.errorCode(errorCode)
359+
)
360+
.build();
361+
EventFW event = eventRW
362+
.wrap(eventBuffer, 0, eventBuffer.capacity())
363+
.id(offsetCommitFailedEventId)
364+
.timestamp(clock.millis())
365+
.traceId(traceId)
366+
.namespacedId(bindingId)
367+
.extension(extension.buffer(), extension.offset(), extension.limit())
368+
.build();
369+
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
370+
}
198371
}

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,16 @@
2121
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.EventFW;
2222
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaApiVersionRejectedExFW;
2323
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaAuthorizationFailedExFW;
24+
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaBrokerConnectionFailedExFW;
2425
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaClusterAuthorizationFailedExFW;
2526
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventExFW;
27+
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaGroupAuthorizationFailedExFW;
28+
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaOffsetCommitFailedExFW;
29+
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaProduceErrorExFW;
2630
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaSaslAuthenticationFailedExFW;
2731
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaTopicAuthorizationFailedExFW;
32+
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaTransactionalIdAuthorizationFailedExFW;
33+
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaUnsupportedSaslMechanismExFW;
2834
import io.aklivity.zilla.runtime.engine.Configuration;
2935
import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi;
3036

@@ -84,6 +90,50 @@ public String format(
8490
asString(ex.identity()), asString(ex.error()));
8591
break;
8692
}
93+
case BROKER_CONNECTION_FAILED:
94+
{
95+
final KafkaBrokerConnectionFailedExFW ex = extension.brokerConnectionFailed();
96+
result = String.format("Broker connection failed for host (%s), port (%d).",
97+
asString(ex.host()), ex.port());
98+
break;
99+
}
100+
case PRODUCE_ERROR:
101+
{
102+
final KafkaProduceErrorExFW ex = extension.produceError();
103+
KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey());
104+
result = String.format("%s (Version: %d) Produce error (%d) for topic (%s).",
105+
apiKey.title(), ex.apiVersion(), ex.errorCode(), asString(ex.topic()));
106+
break;
107+
}
108+
case GROUP_AUTHORIZATION_FAILED:
109+
{
110+
final KafkaGroupAuthorizationFailedExFW ex = extension.groupAuthorizationFailed();
111+
KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey());
112+
result = String.format("%s (Version: %d) Group authorization failed.", apiKey.title(), ex.apiVersion());
113+
break;
114+
}
115+
case TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
116+
{
117+
final KafkaTransactionalIdAuthorizationFailedExFW ex = extension.transactionalIdAuthorizationFailed();
118+
KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey());
119+
result = String.format("%s (Version: %d) Transactional id authorization failed.",
120+
apiKey.title(), ex.apiVersion());
121+
break;
122+
}
123+
case UNSUPPORTED_SASL_MECHANISM:
124+
{
125+
final KafkaUnsupportedSaslMechanismExFW ex = extension.unsupportedSaslMechanism();
126+
result = String.format("Unsupported SASL mechanism (%s).", asString(ex.mechanism()));
127+
break;
128+
}
129+
case OFFSET_COMMIT_FAILED:
130+
{
131+
final KafkaOffsetCommitFailedExFW ex = extension.offsetCommitFailed();
132+
KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey());
133+
result = String.format("%s (Version: %d) Offset commit failed with error (%d).",
134+
apiKey.title(), ex.apiVersion(), ex.errorCode());
135+
break;
136+
}
87137
}
88138
return result;
89139
}

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheClientProduceFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig;
5050
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig;
5151
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaTopicType;
52+
import io.aklivity.zilla.runtime.binding.kafka.internal.events.KafkaEventContext;
5253
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Array32FW;
5354
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
5455
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaAckMode;
@@ -101,6 +102,9 @@ public final class KafkaCacheClientProduceFactory implements BindingHandler
101102
private static final short PRODUCE_FLUSH_PRODUCER_EPOCH = -1;
102103
private static final int PRODUCE_FLUSH_SEQUENCE = -1;
103104

105+
private static final int PRODUCE_API_KEY = 0;
106+
private static final int PRODUCE_API_VERSION = 3;
107+
104108
private static final int ERROR_CORRUPT_MESSAGE = 2;
105109
private static final int ERROR_NOT_LEADER_FOR_PARTITION = 6;
106110
private static final int ERROR_RECORD_LIST_TOO_LARGE = 18;
@@ -153,6 +157,7 @@ public final class KafkaCacheClientProduceFactory implements BindingHandler
153157
private final KafkaCacheEntryFW entryRO = new KafkaCacheEntryFW();
154158

155159
private final int kafkaTypeId;
160+
private final KafkaEventContext event;
156161
private final BufferPool bufferPool;
157162
private final BudgetCreditor creditor;
158163
private final Signaler signaler;
@@ -185,6 +190,7 @@ public KafkaCacheClientProduceFactory(
185190
{
186191
this.context = context;
187192
this.kafkaTypeId = context.supplyTypeId(KafkaBinding.NAME);
193+
this.event = new KafkaEventContext(context);
188194
this.writeBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
189195
this.extBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
190196
this.bufferPool = context.bufferPool();
@@ -767,6 +773,7 @@ private void onClientInitialData(
767773

768774
if (error != NO_ERROR)
769775
{
776+
event.produceError(traceId, originId, PRODUCE_API_KEY, PRODUCE_API_VERSION, error, topicName);
770777
stream.cleanupClient(traceId, error);
771778
onClientFanMemberClosed(traceId, stream);
772779
}
@@ -811,6 +818,7 @@ stream.valueMark, stream.valueLimit, now().toEpochMilli(), stream.initialId, PRO
811818

812819
if (error != NO_ERROR)
813820
{
821+
event.produceError(traceId, originId, PRODUCE_API_KEY, PRODUCE_API_VERSION, error, topicName);
814822
stream.cleanupClient(traceId, error);
815823
onClientFanMemberClosed(traceId, stream);
816824
}

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientConnectionPool.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
4444
import io.aklivity.zilla.runtime.binding.kafka.internal.budget.MergedBudgetCreditor;
4545
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig;
46+
import io.aklivity.zilla.runtime.binding.kafka.internal.events.KafkaEventContext;
4647
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
4748
import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW;
4849
import io.aklivity.zilla.runtime.binding.kafka.internal.types.ProxyAddressInetFW;
@@ -82,6 +83,8 @@ public final class KafkaClientConnectionPool extends KafkaClientSaslHandshaker
8283
private static final int SIGNAL_CONNECTION_CLEANUP = 0x80000007;
8384
private static final int SIGNAL_NEXT_REQUEST = 0x80000008;
8485

86+
private static final int ERROR_UNSUPPORTED_SASL_MECHANISM = 33;
87+
8588
private final BeginFW beginRO = new BeginFW();
8689
private final DataFW dataRO = new DataFW();
8790
private final EndFW endRO = new EndFW();
@@ -129,6 +132,7 @@ public final class KafkaClientConnectionPool extends KafkaClientSaslHandshaker
129132
private final Object2ObjectHashMap<String, KafkaClientConnection> connectionPool;
130133
private final Long2ObjectHashMap<KafkaClientStream> streamsByInitialId;
131134
private final long connectionPoolCleanupMillis;
135+
private final KafkaEventContext event;
132136

133137
public KafkaClientConnectionPool(
134138
KafkaConfiguration config,
@@ -153,6 +157,7 @@ public KafkaClientConnectionPool(
153157
this.connectionPool = new Object2ObjectHashMap<>();
154158
this.streamsByInitialId = new Long2ObjectHashMap<>();
155159
this.connectionPoolCleanupMillis = config.clientConnectionPoolCleanupMillis();
160+
this.event = new KafkaEventContext(context);
156161
}
157162

158163
private MessageConsumer newStream(
@@ -1658,6 +1663,11 @@ private void onConnectionAbort(
16581663

16591664
doConnectionAbort(traceId);
16601665

1666+
if (server != null)
1667+
{
1668+
event.brokerConnectionFailed(traceId, originId, server.host, server.port);
1669+
}
1670+
16611671
cleanupStreams(traceId);
16621672
}
16631673

@@ -1714,6 +1724,11 @@ private void onConnectionReset(
17141724

17151725
cleanupBudgetCreditorIfNecessary();
17161726

1727+
if (server != null)
1728+
{
1729+
event.brokerConnectionFailed(traceId, originId, server.host, server.port);
1730+
}
1731+
17171732
cleanupStreams(traceId);
17181733
}
17191734

@@ -2043,6 +2058,10 @@ protected void onDecodeSaslHandshakeResponse(
20432058
decoder = decodeSaslAuthenticateResponse;
20442059
break;
20452060
default:
2061+
if (ERROR_UNSUPPORTED_SASL_MECHANISM == errorCode)
2062+
{
2063+
event.unsupportedSaslMechanism(traceId, originId, sasl.mechanism);
2064+
}
20462065
cleanupConnection(traceId);
20472066
break;
20482067
}

0 commit comments

Comments
 (0)