Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@

import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.API_VERSION_REJECTED;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.AUTHORIZATION_FAILED;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.BROKER_CONNECTION_FAILED;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.CLUSTER_AUTHORIZATION_FAILED;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.GROUP_AUTHORIZATION_FAILED;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.OFFSET_COMMIT_FAILED;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.PRODUCE_ERROR;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.SASL_AUTHENTICATION_FAILED;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.TOPIC_AUTHORIZATION_FAILED;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.TRANSACTIONAL_ID_AUTHORIZATION_FAILED;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.UNSUPPORTED_SASL_MECHANISM;

import java.nio.ByteBuffer;
import java.time.Clock;
Expand Down Expand Up @@ -47,6 +53,12 @@ public class KafkaEventContext
private final int clusterAuthorizationFailedEventId;
private final int topicAuthorizationFailedEventId;
private final int saslAuthenticationFailedEventId;
private final int brokerConnectionFailedEventId;
private final int produceErrorEventId;
private final int groupAuthorizationFailedEventId;
private final int transactionalIdAuthorizationFailedEventId;
private final int unsupportedSaslMechanismEventId;
private final int offsetCommitFailedEventId;
private final MessageConsumer eventWriter;
private final Clock clock;

Expand All @@ -59,6 +71,13 @@ public KafkaEventContext(
this.clusterAuthorizationFailedEventId = context.supplyEventId("binding.kafka.cluster.authorization.failed");
this.topicAuthorizationFailedEventId = context.supplyEventId("binding.kafka.topic.authorization.failed");
this.saslAuthenticationFailedEventId = context.supplyEventId("binding.kafka.sasl.authentication.failed");
this.brokerConnectionFailedEventId = context.supplyEventId("binding.kafka.broker.connection.failed");
this.produceErrorEventId = context.supplyEventId("binding.kafka.produce.error");
this.groupAuthorizationFailedEventId = context.supplyEventId("binding.kafka.group.authorization.failed");
this.transactionalIdAuthorizationFailedEventId =
context.supplyEventId("binding.kafka.transactional.id.authorization.failed");
this.unsupportedSaslMechanismEventId = context.supplyEventId("binding.kafka.unsupported.sasl.mechanism");
this.offsetCommitFailedEventId = context.supplyEventId("binding.kafka.offset.commit.failed");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}
Expand Down Expand Up @@ -195,4 +214,158 @@ public void saslAuthenticationFailed(
.build();
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
}

public void brokerConnectionFailed(
long traceId,
long bindingId,
String host,
int port)
{
KafkaEventExFW extension = kafkaEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.brokerConnectionFailed(e -> e
.typeId(BROKER_CONNECTION_FAILED.value())
.host(host)
.port(port)
)
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(brokerConnectionFailedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
.extension(extension.buffer(), extension.offset(), extension.limit())
.build();
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
}

public void produceError(
long traceId,
long bindingId,
int apiKey,
int apiVersion,
int errorCode,
String topic)
{
KafkaEventExFW extension = kafkaEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.produceError(e -> e
.typeId(PRODUCE_ERROR.value())
.apiKey(apiKey)
.apiVersion(apiVersion)
.errorCode(errorCode)
.topic(topic)
)
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(produceErrorEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
.extension(extension.buffer(), extension.offset(), extension.limit())
.build();
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
}

public void groupAuthorizationFailed(
long traceId,
long bindingId,
int apiKey,
int apiVersion)
{
KafkaEventExFW extension = kafkaEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.groupAuthorizationFailed(e -> e
.typeId(GROUP_AUTHORIZATION_FAILED.value())
.apiKey(apiKey)
.apiVersion(apiVersion)
)
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(groupAuthorizationFailedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
.extension(extension.buffer(), extension.offset(), extension.limit())
.build();
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
}

public void transactionalIdAuthorizationFailed(
long traceId,
long bindingId,
int apiKey,
int apiVersion)
{
KafkaEventExFW extension = kafkaEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.transactionalIdAuthorizationFailed(e -> e
.typeId(TRANSACTIONAL_ID_AUTHORIZATION_FAILED.value())
.apiKey(apiKey)
.apiVersion(apiVersion)
)
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(transactionalIdAuthorizationFailedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
.extension(extension.buffer(), extension.offset(), extension.limit())
.build();
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
}

public void unsupportedSaslMechanism(
long traceId,
long bindingId,
String mechanism)
{
KafkaEventExFW extension = kafkaEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.unsupportedSaslMechanism(e -> e
.typeId(UNSUPPORTED_SASL_MECHANISM.value())
.mechanism(mechanism)
)
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(unsupportedSaslMechanismEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
.extension(extension.buffer(), extension.offset(), extension.limit())
.build();
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
}

public void offsetCommitFailed(
long traceId,
long bindingId,
int apiKey,
int apiVersion,
int errorCode)
{
KafkaEventExFW extension = kafkaEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.offsetCommitFailed(e -> e
.typeId(OFFSET_COMMIT_FAILED.value())
.apiKey(apiKey)
.apiVersion(apiVersion)
.errorCode(errorCode)
)
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(offsetCommitFailedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
.extension(extension.buffer(), extension.offset(), extension.limit())
.build();
eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaApiVersionRejectedExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaAuthorizationFailedExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaBrokerConnectionFailedExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaClusterAuthorizationFailedExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaGroupAuthorizationFailedExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaOffsetCommitFailedExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaProduceErrorExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaSaslAuthenticationFailedExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaTopicAuthorizationFailedExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaTransactionalIdAuthorizationFailedExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaUnsupportedSaslMechanismExFW;
import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi;

Expand Down Expand Up @@ -84,6 +90,50 @@ public String format(
asString(ex.identity()), asString(ex.error()));
break;
}
case BROKER_CONNECTION_FAILED:
{
final KafkaBrokerConnectionFailedExFW ex = extension.brokerConnectionFailed();
result = String.format("Broker connection failed for host (%s), port (%d).",
asString(ex.host()), ex.port());
break;
}
case PRODUCE_ERROR:
{
final KafkaProduceErrorExFW ex = extension.produceError();
KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey());
result = String.format("%s (Version: %d) Produce error (%d) for topic (%s).",
apiKey.title(), ex.apiVersion(), ex.errorCode(), asString(ex.topic()));
break;
}
case GROUP_AUTHORIZATION_FAILED:
{
final KafkaGroupAuthorizationFailedExFW ex = extension.groupAuthorizationFailed();
KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey());
result = String.format("%s (Version: %d) Group authorization failed.", apiKey.title(), ex.apiVersion());
break;
}
case TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
{
final KafkaTransactionalIdAuthorizationFailedExFW ex = extension.transactionalIdAuthorizationFailed();
KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey());
result = String.format("%s (Version: %d) Transactional id authorization failed.",
apiKey.title(), ex.apiVersion());
break;
}
case UNSUPPORTED_SASL_MECHANISM:
{
final KafkaUnsupportedSaslMechanismExFW ex = extension.unsupportedSaslMechanism();
result = String.format("Unsupported SASL mechanism (%s).", asString(ex.mechanism()));
break;
}
case OFFSET_COMMIT_FAILED:
{
final KafkaOffsetCommitFailedExFW ex = extension.offsetCommitFailed();
KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey());
result = String.format("%s (Version: %d) Offset commit failed with error (%d).",
apiKey.title(), ex.apiVersion(), ex.errorCode());
break;
}
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaTopicType;
import io.aklivity.zilla.runtime.binding.kafka.internal.events.KafkaEventContext;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Array32FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaAckMode;
Expand Down Expand Up @@ -101,6 +102,9 @@ public final class KafkaCacheClientProduceFactory implements BindingHandler
private static final short PRODUCE_FLUSH_PRODUCER_EPOCH = -1;
private static final int PRODUCE_FLUSH_SEQUENCE = -1;

private static final int PRODUCE_API_KEY = 0;
private static final int PRODUCE_API_VERSION = 3;

private static final int ERROR_CORRUPT_MESSAGE = 2;
private static final int ERROR_NOT_LEADER_FOR_PARTITION = 6;
private static final int ERROR_RECORD_LIST_TOO_LARGE = 18;
Expand Down Expand Up @@ -153,6 +157,7 @@ public final class KafkaCacheClientProduceFactory implements BindingHandler
private final KafkaCacheEntryFW entryRO = new KafkaCacheEntryFW();

private final int kafkaTypeId;
private final KafkaEventContext event;
private final BufferPool bufferPool;
private final BudgetCreditor creditor;
private final Signaler signaler;
Expand Down Expand Up @@ -185,6 +190,7 @@ public KafkaCacheClientProduceFactory(
{
this.context = context;
this.kafkaTypeId = context.supplyTypeId(KafkaBinding.NAME);
this.event = new KafkaEventContext(context);
this.writeBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
this.extBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
this.bufferPool = context.bufferPool();
Expand Down Expand Up @@ -767,6 +773,7 @@ private void onClientInitialData(

if (error != NO_ERROR)
{
event.produceError(traceId, originId, PRODUCE_API_KEY, PRODUCE_API_VERSION, error, topicName);
stream.cleanupClient(traceId, error);
onClientFanMemberClosed(traceId, stream);
}
Expand Down Expand Up @@ -811,6 +818,7 @@ stream.valueMark, stream.valueLimit, now().toEpochMilli(), stream.initialId, PRO

if (error != NO_ERROR)
{
event.produceError(traceId, originId, PRODUCE_API_KEY, PRODUCE_API_VERSION, error, topicName);
stream.cleanupClient(traceId, error);
onClientFanMemberClosed(traceId, stream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
import io.aklivity.zilla.runtime.binding.kafka.internal.budget.MergedBudgetCreditor;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.events.KafkaEventContext;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.ProxyAddressInetFW;
Expand Down Expand Up @@ -81,6 +82,8 @@ public final class KafkaClientConnectionPool extends KafkaClientSaslHandshaker
private static final int SIGNAL_CONNECTION_CLEANUP = 0x80000007;
private static final int SIGNAL_NEXT_REQUEST = 0x80000008;

private static final int ERROR_UNSUPPORTED_SASL_MECHANISM = 33;

private final BeginFW beginRO = new BeginFW();
private final DataFW dataRO = new DataFW();
private final EndFW endRO = new EndFW();
Expand Down Expand Up @@ -128,6 +131,7 @@ public final class KafkaClientConnectionPool extends KafkaClientSaslHandshaker
private final Object2ObjectHashMap<String, KafkaClientConnection> connectionPool;
private final Long2ObjectHashMap<KafkaClientStream> streamsByInitialId;
private final long connectionPoolCleanupMillis;
private final KafkaEventContext event;

public KafkaClientConnectionPool(
KafkaConfiguration config,
Expand All @@ -152,6 +156,7 @@ public KafkaClientConnectionPool(
this.connectionPool = new Object2ObjectHashMap<>();
this.streamsByInitialId = new Long2ObjectHashMap<>();
this.connectionPoolCleanupMillis = config.clientConnectionPoolCleanupMillis();
this.event = new KafkaEventContext(context);
}

private MessageConsumer newStream(
Expand Down Expand Up @@ -1635,6 +1640,11 @@ private void onConnectionAbort(

doConnectionAbort(traceId);

if (server != null)
{
event.brokerConnectionFailed(traceId, originId, server.host, server.port);
}

cleanupStreams(traceId);
}

Expand Down Expand Up @@ -1691,6 +1701,11 @@ private void onConnectionReset(

cleanupBudgetCreditorIfNecessary();

if (server != null)
{
event.brokerConnectionFailed(traceId, originId, server.host, server.port);
}

cleanupStreams(traceId);
}

Expand Down Expand Up @@ -2020,6 +2035,10 @@ protected void onDecodeSaslHandshakeResponse(
decoder = decodeSaslAuthenticateResponse;
break;
default:
if (ERROR_UNSUPPORTED_SASL_MECHANISM == errorCode)
{
event.unsupportedSaslMechanism(traceId, originId, sasl.mechanism);
}
cleanupConnection(traceId);
break;
}
Expand Down
Loading
Loading