diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java index 29ac413192..3c421cdb29 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventContext.java @@ -17,9 +17,15 @@ import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.API_VERSION_REJECTED; import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.AUTHORIZATION_FAILED; +import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.BROKER_CONNECTION_FAILED; import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.CLUSTER_AUTHORIZATION_FAILED; +import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.GROUP_AUTHORIZATION_FAILED; +import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.OFFSET_COMMIT_FAILED; +import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.PRODUCE_ERROR; import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.SASL_AUTHENTICATION_FAILED; import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.TOPIC_AUTHORIZATION_FAILED; +import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.TRANSACTIONAL_ID_AUTHORIZATION_FAILED; +import static io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventType.UNSUPPORTED_SASL_MECHANISM; import java.nio.ByteBuffer; import java.time.Clock; @@ -47,6 +53,12 @@ public class KafkaEventContext private final int clusterAuthorizationFailedEventId; private final int topicAuthorizationFailedEventId; private final int saslAuthenticationFailedEventId; + private final int brokerConnectionFailedEventId; + private final int produceErrorEventId; + private final int groupAuthorizationFailedEventId; + private final int transactionalIdAuthorizationFailedEventId; + private final int unsupportedSaslMechanismEventId; + private final int offsetCommitFailedEventId; private final MessageConsumer eventWriter; private final Clock clock; @@ -59,6 +71,13 @@ public KafkaEventContext( this.clusterAuthorizationFailedEventId = context.supplyEventId("binding.kafka.cluster.authorization.failed"); this.topicAuthorizationFailedEventId = context.supplyEventId("binding.kafka.topic.authorization.failed"); this.saslAuthenticationFailedEventId = context.supplyEventId("binding.kafka.sasl.authentication.failed"); + this.brokerConnectionFailedEventId = context.supplyEventId("binding.kafka.broker.connection.failed"); + this.produceErrorEventId = context.supplyEventId("binding.kafka.produce.error"); + this.groupAuthorizationFailedEventId = context.supplyEventId("binding.kafka.group.authorization.failed"); + this.transactionalIdAuthorizationFailedEventId = + context.supplyEventId("binding.kafka.transactional.id.authorization.failed"); + this.unsupportedSaslMechanismEventId = context.supplyEventId("binding.kafka.unsupported.sasl.mechanism"); + this.offsetCommitFailedEventId = context.supplyEventId("binding.kafka.offset.commit.failed"); this.eventWriter = context.supplyEventWriter(); this.clock = context.clock(); } @@ -195,4 +214,158 @@ public void saslAuthenticationFailed( .build(); eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit()); } + + public void brokerConnectionFailed( + long traceId, + long bindingId, + String host, + int port) + { + KafkaEventExFW extension = kafkaEventExRW + .wrap(extensionBuffer, 0, extensionBuffer.capacity()) + .brokerConnectionFailed(e -> e + .typeId(BROKER_CONNECTION_FAILED.value()) + .host(host) + .port(port) + ) + .build(); + EventFW event = eventRW + .wrap(eventBuffer, 0, eventBuffer.capacity()) + .id(brokerConnectionFailedEventId) + .timestamp(clock.millis()) + .traceId(traceId) + .namespacedId(bindingId) + .extension(extension.buffer(), extension.offset(), extension.limit()) + .build(); + eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit()); + } + + public void produceError( + long traceId, + long bindingId, + int apiKey, + int apiVersion, + int errorCode, + String topic) + { + KafkaEventExFW extension = kafkaEventExRW + .wrap(extensionBuffer, 0, extensionBuffer.capacity()) + .produceError(e -> e + .typeId(PRODUCE_ERROR.value()) + .apiKey(apiKey) + .apiVersion(apiVersion) + .errorCode(errorCode) + .topic(topic) + ) + .build(); + EventFW event = eventRW + .wrap(eventBuffer, 0, eventBuffer.capacity()) + .id(produceErrorEventId) + .timestamp(clock.millis()) + .traceId(traceId) + .namespacedId(bindingId) + .extension(extension.buffer(), extension.offset(), extension.limit()) + .build(); + eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit()); + } + + public void groupAuthorizationFailed( + long traceId, + long bindingId, + int apiKey, + int apiVersion) + { + KafkaEventExFW extension = kafkaEventExRW + .wrap(extensionBuffer, 0, extensionBuffer.capacity()) + .groupAuthorizationFailed(e -> e + .typeId(GROUP_AUTHORIZATION_FAILED.value()) + .apiKey(apiKey) + .apiVersion(apiVersion) + ) + .build(); + EventFW event = eventRW + .wrap(eventBuffer, 0, eventBuffer.capacity()) + .id(groupAuthorizationFailedEventId) + .timestamp(clock.millis()) + .traceId(traceId) + .namespacedId(bindingId) + .extension(extension.buffer(), extension.offset(), extension.limit()) + .build(); + eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit()); + } + + public void transactionalIdAuthorizationFailed( + long traceId, + long bindingId, + int apiKey, + int apiVersion) + { + KafkaEventExFW extension = kafkaEventExRW + .wrap(extensionBuffer, 0, extensionBuffer.capacity()) + .transactionalIdAuthorizationFailed(e -> e + .typeId(TRANSACTIONAL_ID_AUTHORIZATION_FAILED.value()) + .apiKey(apiKey) + .apiVersion(apiVersion) + ) + .build(); + EventFW event = eventRW + .wrap(eventBuffer, 0, eventBuffer.capacity()) + .id(transactionalIdAuthorizationFailedEventId) + .timestamp(clock.millis()) + .traceId(traceId) + .namespacedId(bindingId) + .extension(extension.buffer(), extension.offset(), extension.limit()) + .build(); + eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit()); + } + + public void unsupportedSaslMechanism( + long traceId, + long bindingId, + String mechanism) + { + KafkaEventExFW extension = kafkaEventExRW + .wrap(extensionBuffer, 0, extensionBuffer.capacity()) + .unsupportedSaslMechanism(e -> e + .typeId(UNSUPPORTED_SASL_MECHANISM.value()) + .mechanism(mechanism) + ) + .build(); + EventFW event = eventRW + .wrap(eventBuffer, 0, eventBuffer.capacity()) + .id(unsupportedSaslMechanismEventId) + .timestamp(clock.millis()) + .traceId(traceId) + .namespacedId(bindingId) + .extension(extension.buffer(), extension.offset(), extension.limit()) + .build(); + eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit()); + } + + public void offsetCommitFailed( + long traceId, + long bindingId, + int apiKey, + int apiVersion, + int errorCode) + { + KafkaEventExFW extension = kafkaEventExRW + .wrap(extensionBuffer, 0, extensionBuffer.capacity()) + .offsetCommitFailed(e -> e + .typeId(OFFSET_COMMIT_FAILED.value()) + .apiKey(apiKey) + .apiVersion(apiVersion) + .errorCode(errorCode) + ) + .build(); + EventFW event = eventRW + .wrap(eventBuffer, 0, eventBuffer.capacity()) + .id(offsetCommitFailedEventId) + .timestamp(clock.millis()) + .traceId(traceId) + .namespacedId(bindingId) + .extension(extension.buffer(), extension.offset(), extension.limit()) + .build(); + eventWriter.accept(kafkaTypeId, event.buffer(), event.offset(), event.limit()); + } } diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java index 2af2eb9a7d..ca7d1852fc 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/events/KafkaEventFormatter.java @@ -21,10 +21,16 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.EventFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaApiVersionRejectedExFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaAuthorizationFailedExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaBrokerConnectionFailedExFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaClusterAuthorizationFailedExFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaEventExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaGroupAuthorizationFailedExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaOffsetCommitFailedExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaProduceErrorExFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaSaslAuthenticationFailedExFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaTopicAuthorizationFailedExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaTransactionalIdAuthorizationFailedExFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.event.KafkaUnsupportedSaslMechanismExFW; import io.aklivity.zilla.runtime.engine.Configuration; import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi; @@ -84,6 +90,50 @@ public String format( asString(ex.identity()), asString(ex.error())); break; } + case BROKER_CONNECTION_FAILED: + { + final KafkaBrokerConnectionFailedExFW ex = extension.brokerConnectionFailed(); + result = String.format("Broker connection failed for host (%s), port (%d).", + asString(ex.host()), ex.port()); + break; + } + case PRODUCE_ERROR: + { + final KafkaProduceErrorExFW ex = extension.produceError(); + KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey()); + result = String.format("%s (Version: %d) Produce error (%d) for topic (%s).", + apiKey.title(), ex.apiVersion(), ex.errorCode(), asString(ex.topic())); + break; + } + case GROUP_AUTHORIZATION_FAILED: + { + final KafkaGroupAuthorizationFailedExFW ex = extension.groupAuthorizationFailed(); + KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey()); + result = String.format("%s (Version: %d) Group authorization failed.", apiKey.title(), ex.apiVersion()); + break; + } + case TRANSACTIONAL_ID_AUTHORIZATION_FAILED: + { + final KafkaTransactionalIdAuthorizationFailedExFW ex = extension.transactionalIdAuthorizationFailed(); + KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey()); + result = String.format("%s (Version: %d) Transactional id authorization failed.", + apiKey.title(), ex.apiVersion()); + break; + } + case UNSUPPORTED_SASL_MECHANISM: + { + final KafkaUnsupportedSaslMechanismExFW ex = extension.unsupportedSaslMechanism(); + result = String.format("Unsupported SASL mechanism (%s).", asString(ex.mechanism())); + break; + } + case OFFSET_COMMIT_FAILED: + { + final KafkaOffsetCommitFailedExFW ex = extension.offsetCommitFailed(); + KafkaApiKey apiKey = KafkaApiKey.of(ex.apiKey()); + result = String.format("%s (Version: %d) Offset commit failed with error (%d).", + apiKey.title(), ex.apiVersion(), ex.errorCode()); + break; + } } return result; } diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheClientProduceFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheClientProduceFactory.java index 10edf39857..a6a7ceb02e 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheClientProduceFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheClientProduceFactory.java @@ -49,6 +49,7 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig; import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig; import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaTopicType; +import io.aklivity.zilla.runtime.binding.kafka.internal.events.KafkaEventContext; import io.aklivity.zilla.runtime.binding.kafka.internal.types.Array32FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaAckMode; @@ -101,6 +102,9 @@ public final class KafkaCacheClientProduceFactory implements BindingHandler private static final short PRODUCE_FLUSH_PRODUCER_EPOCH = -1; private static final int PRODUCE_FLUSH_SEQUENCE = -1; + private static final int PRODUCE_API_KEY = 0; + private static final int PRODUCE_API_VERSION = 3; + private static final int ERROR_CORRUPT_MESSAGE = 2; private static final int ERROR_NOT_LEADER_FOR_PARTITION = 6; private static final int ERROR_RECORD_LIST_TOO_LARGE = 18; @@ -153,6 +157,7 @@ public final class KafkaCacheClientProduceFactory implements BindingHandler private final KafkaCacheEntryFW entryRO = new KafkaCacheEntryFW(); private final int kafkaTypeId; + private final KafkaEventContext event; private final BufferPool bufferPool; private final BudgetCreditor creditor; private final Signaler signaler; @@ -185,6 +190,7 @@ public KafkaCacheClientProduceFactory( { this.context = context; this.kafkaTypeId = context.supplyTypeId(KafkaBinding.NAME); + this.event = new KafkaEventContext(context); this.writeBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]); this.extBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]); this.bufferPool = context.bufferPool(); @@ -767,6 +773,7 @@ private void onClientInitialData( if (error != NO_ERROR) { + event.produceError(traceId, originId, PRODUCE_API_KEY, PRODUCE_API_VERSION, error, topicName); stream.cleanupClient(traceId, error); onClientFanMemberClosed(traceId, stream); } @@ -811,6 +818,7 @@ stream.valueMark, stream.valueLimit, now().toEpochMilli(), stream.initialId, PRO if (error != NO_ERROR) { + event.produceError(traceId, originId, PRODUCE_API_KEY, PRODUCE_API_VERSION, error, topicName); stream.cleanupClient(traceId, error); onClientFanMemberClosed(traceId, stream); } diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientConnectionPool.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientConnectionPool.java index ba92efb769..1d49e78cda 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientConnectionPool.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientConnectionPool.java @@ -42,6 +42,7 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration; import io.aklivity.zilla.runtime.binding.kafka.internal.budget.MergedBudgetCreditor; import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig; +import io.aklivity.zilla.runtime.binding.kafka.internal.events.KafkaEventContext; import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight; import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.ProxyAddressInetFW; @@ -81,6 +82,8 @@ public final class KafkaClientConnectionPool extends KafkaClientSaslHandshaker private static final int SIGNAL_CONNECTION_CLEANUP = 0x80000007; private static final int SIGNAL_NEXT_REQUEST = 0x80000008; + private static final int ERROR_UNSUPPORTED_SASL_MECHANISM = 33; + private final BeginFW beginRO = new BeginFW(); private final DataFW dataRO = new DataFW(); private final EndFW endRO = new EndFW(); @@ -128,6 +131,7 @@ public final class KafkaClientConnectionPool extends KafkaClientSaslHandshaker private final Object2ObjectHashMap connectionPool; private final Long2ObjectHashMap streamsByInitialId; private final long connectionPoolCleanupMillis; + private final KafkaEventContext event; public KafkaClientConnectionPool( KafkaConfiguration config, @@ -152,6 +156,7 @@ public KafkaClientConnectionPool( this.connectionPool = new Object2ObjectHashMap<>(); this.streamsByInitialId = new Long2ObjectHashMap<>(); this.connectionPoolCleanupMillis = config.clientConnectionPoolCleanupMillis(); + this.event = new KafkaEventContext(context); } private MessageConsumer newStream( @@ -1635,6 +1640,11 @@ private void onConnectionAbort( doConnectionAbort(traceId); + if (server != null) + { + event.brokerConnectionFailed(traceId, originId, server.host, server.port); + } + cleanupStreams(traceId); } @@ -1691,6 +1701,11 @@ private void onConnectionReset( cleanupBudgetCreditorIfNecessary(); + if (server != null) + { + event.brokerConnectionFailed(traceId, originId, server.host, server.port); + } + cleanupStreams(traceId); } @@ -2020,6 +2035,10 @@ protected void onDecodeSaslHandshakeResponse( decoder = decodeSaslAuthenticateResponse; break; default: + if (ERROR_UNSUPPORTED_SASL_MECHANISM == errorCode) + { + event.unsupportedSaslMechanism(traceId, originId, sasl.mechanism); + } cleanupConnection(traceId); break; } diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java index 31b566f767..688cc317ce 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java @@ -63,11 +63,18 @@ public abstract class KafkaClientSaslHandshaker private static final short SASL_AUTHENTICATE_API_KEY = 36; private static final short SASL_AUTHENTICATE_API_VERSION = 1; - private static final int ERROR_SASL_AUTHENTICATION_FAILED = 58; private static final int ERROR_NONE = 0; + private static final int ERROR_CORRUPT_MESSAGE = 2; + private static final int ERROR_OFFSET_METADATA_TOO_LARGE = 12; + private static final int ERROR_MESSAGE_TOO_LARGE = 10; + private static final int ERROR_RECORD_LIST_TOO_LARGE = 18; + private static final int ERROR_TOPIC_AUTHORIZATION_FAILED = 29; private static final int ERROR_CLUSTER_AUTHORIZATION_FAILED = 31; + private static final int ERROR_GROUP_AUTHORIZATION_FAILED = 30; private static final int ERROR_UNSUPPORTED_VERSION = 35; - private static final int ERROR_TOPIC_AUTHORIZATION_FAILED = 29; + private static final int ERROR_SASL_AUTHENTICATION_FAILED = 58; + private static final int ERROR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53; + private static final int ERROR_INVALID_RECORD = 87; private static final String CLIENT_KEY = "Client Key"; private static final String SERVER_KEY = "Server Key"; @@ -449,10 +456,19 @@ protected final void onDecodeResponseErrorCode( { switch (errorCode) { - case ERROR_CLUSTER_AUTHORIZATION_FAILED -> event.clusterAuthorizationFailed(traceId, bindingId, apiKey, apiVersion); + case ERROR_CORRUPT_MESSAGE, ERROR_MESSAGE_TOO_LARGE, ERROR_RECORD_LIST_TOO_LARGE, ERROR_INVALID_RECORD -> + event.produceError(traceId, bindingId, apiKey, apiVersion, errorCode, topic); + case ERROR_OFFSET_METADATA_TOO_LARGE -> + event.offsetCommitFailed(traceId, bindingId, apiKey, apiVersion, errorCode); + case ERROR_TOPIC_AUTHORIZATION_FAILED -> + event.topicAuthorizationFailed(traceId, bindingId, apiKey, apiVersion, topic); + case ERROR_CLUSTER_AUTHORIZATION_FAILED -> + event.clusterAuthorizationFailed(traceId, bindingId, apiKey, apiVersion); + case ERROR_GROUP_AUTHORIZATION_FAILED -> + event.groupAuthorizationFailed(traceId, bindingId, apiKey, apiVersion); case ERROR_UNSUPPORTED_VERSION -> event.apiVersionRejected(traceId, bindingId, apiKey, apiVersion); - case ERROR_TOPIC_AUTHORIZATION_FAILED -> event.topicAuthorizationFailed(traceId, bindingId, apiKey, - apiVersion, topic); + case ERROR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED -> + event.transactionalIdAuthorizationFailed(traceId, bindingId, apiKey, apiVersion); } } diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientGroupIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientGroupIT.java index 74bd037312..7c467b53d8 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientGroupIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientGroupIT.java @@ -267,6 +267,26 @@ public void shouldHandleInvalidSessionTimeout() throws Exception k3po.finish(); } + @Test + @Configuration("client.event.broker.connection.error.yaml") + @Specification({ + "${app}/broker.connection.error/client", + "${net}/broker.connection.error/server"}) + public void shouldHandleBrokerConnectionError() throws Exception + { + k3po.finish(); + } + + @Test + @Configuration("client.event.group.authorization.error.yaml") + @Specification({ + "${app}/group.authorization.failed/client", + "${net}/group.authorization.failed/server"}) + public void shouldHandleGroupAuthorizationError() throws Exception + { + k3po.finish(); + } + public static String supplyInstanceId() { return "zilla"; diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaSaslIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaSaslIT.java index 507b0f178d..79d7adaaad 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaSaslIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientMetaSaslIT.java @@ -83,6 +83,16 @@ public void shouldReceiveSaslAuthenticationFailed() throws Exception k3po.finish(); } + @Test + @Configuration("client.event.unsupported.sasl.mechanism.yaml") + @Specification({ + "${app}/unsupported.sasl.mechanism/client", + "${net}/unsupported.sasl.mechanism/server"}) + public void shouldHandleUnsupportedSaslMechanism() throws Exception + { + k3po.finish(); + } + public static String supplyNonce() { return "fyko+d2lbbFgONRv9qkxdawL"; diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientOffsetCommitIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientOffsetCommitIT.java index af93ae7836..1b5948a838 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientOffsetCommitIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientOffsetCommitIT.java @@ -77,4 +77,14 @@ public void shouldRejectUnknownTopicPartitionOffset() throws Exception { k3po.finish(); } + + @Test + @Configuration("client.event.offset.commit.error.yaml") + @Specification({ + "${app}/offset.commit.error/client", + "${net}/offset.commit.error/server"}) + public void shouldHandleOffsetCommitError() throws Exception + { + k3po.finish(); + } } diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientProduceIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientProduceIT.java index 3eff7cf91b..9f38a2e926 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientProduceIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/ClientProduceIT.java @@ -377,4 +377,24 @@ public void shouldSendMessageValueSequential() throws Exception { k3po.finish(); } + + @Test + @Configuration("client.event.produce.error.yaml") + @Specification({ + "${app}/message.too.large/client", + "${net}/message.too.large/server"}) + public void shouldHandleProduceError() throws Exception + { + k3po.finish(); + } + + @Test + @Configuration("client.event.transactional.id.authorization.error.yaml") + @Specification({ + "${app}/transaction.id.authorization.error/client", + "${net}/transaction.id.authorization.error/server"}) + public void shouldHandleTransactionalIdAuthorizationError() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl index 4677f3a27f..b79173d794 100644 --- a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl +++ b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl @@ -679,7 +679,13 @@ scope kafka API_VERSION_REJECTED (2), CLUSTER_AUTHORIZATION_FAILED (3), TOPIC_AUTHORIZATION_FAILED (4), - SASL_AUTHENTICATION_FAILED (5) + SASL_AUTHENTICATION_FAILED (5), + BROKER_CONNECTION_FAILED (6), + PRODUCE_ERROR (7), + GROUP_AUTHORIZATION_FAILED (8), + TRANSACTIONAL_ID_AUTHORIZATION_FAILED (9), + UNSUPPORTED_SASL_MECHANISM (10), + OFFSET_COMMIT_FAILED (11) } struct KafkaAuthorizationFailedEx extends core::stream::Extension @@ -712,6 +718,44 @@ scope kafka string16 error = null; } + struct KafkaBrokerConnectionFailedEx extends core::stream::Extension + { + string8 host; + int32 port; + } + + struct KafkaProduceErrorEx extends core::stream::Extension + { + int32 apiKey; + int32 apiVersion; + int32 errorCode; + string8 topic; + } + + struct KafkaGroupAuthorizationFailedEx extends core::stream::Extension + { + int32 apiKey; + int32 apiVersion; + } + + struct KafkaTransactionalIdAuthorizationFailedEx extends core::stream::Extension + { + int32 apiKey; + int32 apiVersion; + } + + struct KafkaUnsupportedSaslMechanismEx extends core::stream::Extension + { + string8 mechanism; + } + + struct KafkaOffsetCommitFailedEx extends core::stream::Extension + { + int32 apiKey; + int32 apiVersion; + int32 errorCode; + } + union KafkaEventEx switch (KafkaEventType) { case AUTHORIZATION_FAILED: KafkaAuthorizationFailedEx authorizationFailed; @@ -719,6 +763,12 @@ scope kafka case CLUSTER_AUTHORIZATION_FAILED: KafkaClusterAuthorizationFailedEx clusterAuthorizationFailed; case TOPIC_AUTHORIZATION_FAILED: KafkaTopicAuthorizationFailedEx topicAuthorizationFailed; case SASL_AUTHENTICATION_FAILED: KafkaSaslAuthenticationFailedEx saslAuthenticationFailed; + case BROKER_CONNECTION_FAILED: KafkaBrokerConnectionFailedEx brokerConnectionFailed; + case PRODUCE_ERROR: KafkaProduceErrorEx produceError; + case GROUP_AUTHORIZATION_FAILED: KafkaGroupAuthorizationFailedEx groupAuthorizationFailed; + case TRANSACTIONAL_ID_AUTHORIZATION_FAILED: KafkaTransactionalIdAuthorizationFailedEx transactionalIdAuthorizationFailed; + case UNSUPPORTED_SASL_MECHANISM: KafkaUnsupportedSaslMechanismEx unsupportedSaslMechanism; + case OFFSET_COMMIT_FAILED: KafkaOffsetCommitFailedEx offsetCommitFailed; } } } diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.broker.connection.error.yaml b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.broker.connection.error.yaml new file mode 100644 index 0000000000..da1ee3b786 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.broker.connection.error.yaml @@ -0,0 +1,45 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +telemetry: + exporters: + exporter0: + type: test + options: + events: + - qname: engine:events + id: engine.started + name: ENGINE_STARTED + message: Engine Started. + - qname: test:app0 + id: binding.kafka.broker.connection.failed + name: BINDING_KAFKA_BROKER_CONNECTION_FAILED + message: "Broker connection failed for host (localhost), port (9092)." + - qname: engine:events + id: engine.stopped + name: ENGINE_STOPPED + message: Engine Stopped. +bindings: + app0: + type: kafka + kind: client + options: + servers: + - localhost:9092 + routes: + - exit: net0 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.group.authorization.error.yaml b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.group.authorization.error.yaml new file mode 100644 index 0000000000..007666e6a6 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.group.authorization.error.yaml @@ -0,0 +1,41 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +telemetry: + exporters: + exporter0: + type: test + options: + events: + - qname: engine:events + id: engine.started + name: ENGINE_STARTED + message: Engine Started. + - qname: test:app0 + id: binding.kafka.group.authorization.failed + name: BINDING_KAFKA_GROUP_AUTHORIZATION_FAILED + message: "FindCoordinator (Version: 1) Group authorization failed." + - qname: engine:events + id: engine.stopped + name: ENGINE_STOPPED + message: Engine Stopped. +bindings: + app0: + type: kafka + kind: client + exit: net0 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.offset.commit.error.yaml b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.offset.commit.error.yaml new file mode 100644 index 0000000000..0e46a2e53a --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.offset.commit.error.yaml @@ -0,0 +1,41 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +telemetry: + exporters: + exporter0: + type: test + options: + events: + - qname: engine:events + id: engine.started + name: ENGINE_STARTED + message: Engine Started. + - qname: test:app0 + id: binding.kafka.offset.commit.failed + name: BINDING_KAFKA_OFFSET_COMMIT_FAILED + message: "OffsetCommit (Version: 7) Offset commit failed with error (12)." + - qname: engine:events + id: engine.stopped + name: ENGINE_STOPPED + message: Engine Stopped. +bindings: + app0: + type: kafka + kind: client + exit: net0 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.produce.error.yaml b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.produce.error.yaml new file mode 100644 index 0000000000..8b22d682cd --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.produce.error.yaml @@ -0,0 +1,41 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +telemetry: + exporters: + exporter0: + type: test + options: + events: + - qname: engine:events + id: engine.started + name: ENGINE_STARTED + message: Engine Started. + - qname: test:app0 + id: binding.kafka.produce.error + name: BINDING_KAFKA_PRODUCE_ERROR + message: "Produce (Version: 3) Produce error (10) for topic (test)." + - qname: engine:events + id: engine.stopped + name: ENGINE_STOPPED + message: Engine Stopped. +bindings: + app0: + type: kafka + kind: client + exit: net0 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.transactional.id.authorization.error.yaml b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.transactional.id.authorization.error.yaml new file mode 100644 index 0000000000..0f2a7a9e77 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.transactional.id.authorization.error.yaml @@ -0,0 +1,41 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +telemetry: + exporters: + exporter0: + type: test + options: + events: + - qname: engine:events + id: engine.started + name: ENGINE_STARTED + message: Engine Started. + - qname: test:app0 + id: binding.kafka.transactional.id.authorization.failed + name: BINDING_KAFKA_TRANSACTIONAL_ID_AUTHORIZATION_FAILED + message: "Produce (Version: 3) Transactional id authorization failed." + - qname: engine:events + id: engine.stopped + name: ENGINE_STOPPED + message: Engine Stopped. +bindings: + app0: + type: kafka + kind: client + exit: net0 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.unsupported.sasl.mechanism.yaml b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.unsupported.sasl.mechanism.yaml new file mode 100644 index 0000000000..0fa1dfbfb6 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/client.event.unsupported.sasl.mechanism.yaml @@ -0,0 +1,49 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +telemetry: + exporters: + exporter0: + type: test + options: + events: + - qname: engine:events + id: engine.started + name: ENGINE_STARTED + message: Engine Started. + - qname: test:app0 + id: binding.kafka.unsupported.sasl.mechanism + name: BINDING_KAFKA_UNSUPPORTED_SASL_MECHANISM + message: "Unsupported SASL mechanism (plain)." + - qname: engine:events + id: engine.stopped + name: ENGINE_STOPPED + message: Engine Stopped. +bindings: + app0: + type: kafka + kind: client + options: + servers: + - localhost:9092 + sasl: + mechanism: plain + username: username + password: password + routes: + - exit: net0 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/broker.connection.error/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/broker.connection.error/client.rpt new file mode 100644 index 0000000000..35693c9179 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/broker.connection.error/client.rpt @@ -0,0 +1,36 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .group() + .groupId("test") + .protocol("highlander") + .timeout(45000) + .build() + .build()} + +connected + +read zilla:reset.ext ${kafka:resetEx() + .typeId(zilla:id("kafka")) + .error(-1) + .build()} +write aborted diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/broker.connection.error/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/broker.connection.error/server.rpt new file mode 100644 index 0000000000..9874194903 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/group/broker.connection.error/server.rpt @@ -0,0 +1,41 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:matchBeginEx() + .typeId(zilla:id("kafka")) + .group() + .groupId("test") + .protocol("highlander") + .timeout(45000) + .build() + .build()} + +connected + +write zilla:reset.ext ${kafka:resetEx() + .typeId(zilla:id("kafka")) + .error(-1) + .build()} + +read abort diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/unsupported.sasl.mechanism/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/unsupported.sasl.mechanism/client.rpt new file mode 100644 index 0000000000..c93df37607 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/unsupported.sasl.mechanism/client.rpt @@ -0,0 +1,28 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connect aborted diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/unsupported.sasl.mechanism/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/unsupported.sasl.mechanism/server.rpt new file mode 100644 index 0000000000..b1b16c2817 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/meta/unsupported.sasl.mechanism/server.rpt @@ -0,0 +1,23 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "half-duplex" + +rejected diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/offset.commit/offset.commit.error/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/offset.commit/offset.commit.error/client.rpt new file mode 100644 index 0000000000..eb89878260 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/offset.commit/offset.commit.error/client.rpt @@ -0,0 +1,48 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .offsetCommit() + .groupId("client-1") + .memberId("memberId-1") + .instanceId("zilla") + .host("broker1.example.com") + .port(9092) + .build() + .build()} + +connected + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .offsetCommit() + .topic("test") + .progress(0, 2, "test-meta") + .generationId(0) + .leaderEpoch(0) + .build() + .build()} + +write zilla:data.empty +write flush + +write aborted +read abort diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/offset.commit/offset.commit.error/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/offset.commit/offset.commit.error/server.rpt new file mode 100644 index 0000000000..423f8509fd --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/offset.commit/offset.commit.error/server.rpt @@ -0,0 +1,50 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .offsetCommit() + .groupId("client-1") + .memberId("memberId-1") + .instanceId("zilla") + .host("broker1.example.com") + .port(9092) + .build() + .build()} + +connected + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .offsetCommit() + .topic("test") + .progress(0, 2, "test-meta") + .generationId(0) + .leaderEpoch(0) + .build() + .build()} + +read zilla:data.empty + +read abort diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.too.large/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.too.large/client.rpt new file mode 100644 index 0000000000..9832603418 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.too.large/client.rpt @@ -0,0 +1,74 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 177) + .build() + .build()} + +read notify ROUTED_BROKER_CLIENT + +connect await ROUTED_BROKER_CLIENT + "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + option zilla:affinity 177 + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .build() + .build()} +write flush + +write aborted +read abort diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.too.large/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.too.large/server.rpt new file mode 100644 index 0000000000..a39ec1f83e --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/message.too.large/server.rpt @@ -0,0 +1,69 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 177) + .build() + .build()} +write flush + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .build() + .build()} +read zilla:data.null + +read abort diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/transaction.id.authorization.error/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/transaction.id.authorization.error/client.rpt new file mode 100644 index 0000000000..9832603418 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/transaction.id.authorization.error/client.rpt @@ -0,0 +1,74 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 177) + .build() + .build()} + +read notify ROUTED_BROKER_CLIENT + +connect await ROUTED_BROKER_CLIENT + "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + option zilla:affinity 177 + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .produce() + .timestamp(newTimestamp) + .build() + .build()} +write flush + +write aborted +read abort diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/transaction.id.authorization.error/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/transaction.id.authorization.error/server.rpt new file mode 100644 index 0000000000..a39ec1f83e --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/produce/transaction.id.authorization.error/server.rpt @@ -0,0 +1,69 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 177) + .build() + .build()} +write flush + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .produce() + .topic("test") + .partition(0) + .build() + .build()} + +connected + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .produce() + .build() + .build()} +read zilla:data.null + +read abort diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/broker.connection.error/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/broker.connection.error/client.rpt new file mode 100644 index 0000000000..f6edc5f0dc --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/broker.connection.error/client.rpt @@ -0,0 +1,26 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +read abort diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/broker.connection.error/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/broker.connection.error/server.rpt new file mode 100644 index 0000000000..d3338c4666 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/group.f1.j5.s3.l3.h3/broker.connection.error/server.rpt @@ -0,0 +1,26 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/net0" + option zilla:window 8192 + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +write abort diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/unsupported.sasl.mechanism/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/unsupported.sasl.mechanism/client.rpt new file mode 100644 index 0000000000..35f1942d84 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/unsupported.sasl.mechanism/client.rpt @@ -0,0 +1,38 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 22 # size + 17s # sasl.handshake + 1s # v1 + ${newRequestId} + 5s "zilla" # client id + 5s "PLAIN" # mechanism + +read 10 # size + ${newRequestId} + 33s # unsupported sasl mechanism + 0 # mechanisms count (empty) diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/unsupported.sasl.mechanism/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/unsupported.sasl.mechanism/server.rpt new file mode 100644 index 0000000000..413a459c3f --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/metadata.v5.sasl.handshake.v1/unsupported.sasl.mechanism/server.rpt @@ -0,0 +1,38 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 22 # size + 17s # sasl.handshake + 1s # v1 + (int:requestId) + 5s "zilla" # client id + 5s "PLAIN" # mechanism + +write 10 # size + ${requestId} + 33s # unsupported sasl mechanism + 0 # mechanisms count (empty) diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/offset.commit.v7/offset.commit.error/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/offset.commit.v7/offset.commit.error/client.rpt new file mode 100644 index 0000000000..4bb2e7e20a --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/offset.commit.v7/offset.commit.error/client.rpt @@ -0,0 +1,65 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +write zilla:begin.ext ${proxy:beginEx() + .typeId(zilla:id("proxy")) + .addressInet() + .protocol("stream") + .source("0.0.0.0") + .destination("broker1.example.com") + .sourcePort(0) + .destinationPort(9092) + .build() + .info() + .authority("broker1.example.com") + .build() + .build()} +connected + +write 89 # size + 8s # offset commit + 7s # 7 + ${newRequestId} + 5s "zilla" # client id + 8s "client-1" # group id + 0 + 10s "memberId-1" # consumer member group id + 5s "zilla" # group instance id + 1 # topics + 4s "test" # "test" topic + 1 # partitions + 0 # partition 0 + 2L # committed offset + 0 # committed leader epoch + 9s "test-meta" # metadata + +read 28 # size + (int:newRequestId) + 0 # throttle time ms + 1 # topics + 4s "test" # "test" topic + 1 # partitions + 0 # partition index + 12s # offset-metadata-too-large diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/offset.commit.v7/offset.commit.error/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/offset.commit.v7/offset.commit.error/server.rpt new file mode 100644 index 0000000000..b3298a52f8 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/offset.commit.v7/offset.commit.error/server.rpt @@ -0,0 +1,65 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +read zilla:begin.ext ${proxy:matchBeginEx() + .typeId(zilla:id("proxy")) + .addressInet() + .protocol("stream") + .source("0.0.0.0") + .destination("broker1.example.com") + .sourcePort(0) + .destinationPort(9092) + .build() + .info() + .authority("broker1.example.com") + .build() + .build()} +connected + +read 89 # size + 8s # offset commit + 7s # 7 + (int:newRequestId) + 5s "zilla" # client id + 8s "client-1" # group id + 0 + 10s "memberId-1" # consumer member group id + 5s "zilla" # group instance id + 1 # topics + 4s "test" # "test" topic + 1 # partitions + 0 # partition 0 + 2L # committed offset + 0 # committed leader epoch + 9s "test-meta" # metadata + +write 28 # size + ${newRequestId} + 0 # throttle time ms + 1 # topics + 4s "test" # "test" topic + 1 # partitions + 0 # partition index + 12s # offset-metadata-too-large diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.too.large/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.too.large/client.rpt new file mode 100644 index 0000000000..c187266744 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.too.large/client.rpt @@ -0,0 +1,130 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} +property produceWaitMax 500 + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 26 # size + 3s # metadata + 5s # v5 + ${newRequestId} + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +read 97 # size + ${newRequestId} + [0..4] + 1 # brokers + 0xb1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 0s # no error + 4s "test" # "test" topic + [0x00] # not internal + 1 # partitions + 0s # no error + 0 # partition + 0xb1 # leader + 0 # no replicas + -1 # no in-sync replicas + 0 # offline replicas + +read notify ROUTED_BROKER_SERVER + +connect await ROUTED_BROKER_SERVER + "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +write zilla:begin.ext ${proxy:beginEx() + .typeId(zilla:id("proxy")) + .addressInet() + .protocol("stream") + .source("0.0.0.0") + .destination("broker1.example.com") + .sourcePort(0) + .destinationPort(9092) + .build() + .info() + .authority("broker1.example.com") + .build() + .build()} + +connected + +write 113 # size + 0s # produce + 3s # v3 + ${newRequestId} + 5s "zilla" # client id + -1s # transactional id + 0s # acks + ${produceWaitMax} + 1 + 4s "test" + 1 + 0 # partition + 68 # record set size + 0L # first offset + 56 # length + -1 + [0x02] + 0x4e8723aa + 0s + 0 # last offset delta + ${newTimestamp} # first timestamp + ${newTimestamp} # last timestamp + -1L + -1s + -1 + 1 # records + ${kafka:varint(6)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} + ${kafka:varint(-1)} + ${kafka:varint(0)} # headers + +read 44 + ${newRequestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition + 10s # message-too-large + [0..8] # base offset + [0..8] # log append time + [0..4] # throttle ms diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.too.large/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.too.large/server.rpt new file mode 100644 index 0000000000..8682f76dfa --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/message.too.large/server.rpt @@ -0,0 +1,123 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 26 # size + 3s # metadata + 5s # v5 + (int:requestId) + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +write 97 # size + ${requestId} + 0 + 1 # brokers + 0xb1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 0s # no error + 4s "test" # "test" topic + [0x00] # not internal + 1 # partitions + 0s # no error + 0 # partition + 0xb1 # leader + 0 # no replicas + -1 # no in-sync replicas + 0 # offline replicas + +accepted + +read zilla:begin.ext ${proxy:matchBeginEx() + .typeId(zilla:id("proxy")) + .addressInet() + .protocol("stream") + .source("0.0.0.0") + .destination("broker1.example.com") + .sourcePort(0) + .destinationPort(9092) + .build() + .info() + .authority("broker1.example.com") + .build() + .build()} + +connected + +read 113 + 0s + 3s + (int:requestId) + 5s "zilla" # client id + -1s + [0..2] + [0..4] + 1 + 4s "test" + 1 + 0 + 68 # record set size + 0L # first offset + 56 # length + -1 + [0x02] + [0..4] + 0s + 0 # last offset delta + (long:timestamp) # first timestamp + ${timestamp} # last timestamp + -1L + -1s + -1 + 1 # records + ${kafka:varint(6)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} + ${kafka:varint(-1)} + ${kafka:varint(0)} # headers + +write 44 + ${requestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition 0 + 10s # message-too-large + -1L # base offset + 0L # log append time + 0 # throttle diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/transaction.id.authorization.error/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/transaction.id.authorization.error/client.rpt new file mode 100644 index 0000000000..5a29ce4dda --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/transaction.id.authorization.error/client.rpt @@ -0,0 +1,130 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkConnectWindow 8192 + +property newRequestId ${kafka:newRequestId()} +property produceWaitMax 500 + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +connect "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +connected + +write 26 # size + 3s # metadata + 5s # v5 + ${newRequestId} + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +read 97 # size + ${newRequestId} + [0..4] + 1 # brokers + 0xb1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 0s # no error + 4s "test" # "test" topic + [0x00] # not internal + 1 # partitions + 0s # no error + 0 # partition + 0xb1 # leader + 0 # no replicas + -1 # no in-sync replicas + 0 # offline replicas + +read notify ROUTED_BROKER_SERVER + +connect await ROUTED_BROKER_SERVER + "zilla://streams/net0" + option zilla:window ${networkConnectWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +write zilla:begin.ext ${proxy:beginEx() + .typeId(zilla:id("proxy")) + .addressInet() + .protocol("stream") + .source("0.0.0.0") + .destination("broker1.example.com") + .sourcePort(0) + .destinationPort(9092) + .build() + .info() + .authority("broker1.example.com") + .build() + .build()} + +connected + +write 113 # size + 0s # produce + 3s # v3 + ${newRequestId} + 5s "zilla" # client id + -1s # transactional id + 0s # acks + ${produceWaitMax} + 1 + 4s "test" + 1 + 0 # partition + 68 # record set size + 0L # first offset + 56 # length + -1 + [0x02] + 0x4e8723aa + 0s + 0 # last offset delta + ${newTimestamp} # first timestamp + ${newTimestamp} # last timestamp + -1L + -1s + -1 + 1 # records + ${kafka:varint(6)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} + ${kafka:varint(-1)} + ${kafka:varint(0)} # headers + +read 44 + ${newRequestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition + 53s # transactional-id-authorization-failed + [0..8] # base offset + [0..8] # log append time + [0..4] # throttle ms diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/transaction.id.authorization.error/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/transaction.id.authorization.error/server.rpt new file mode 100644 index 0000000000..03cc5c8c17 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/network/produce.v3/transaction.id.authorization.error/server.rpt @@ -0,0 +1,123 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property networkAcceptWindow 8192 + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +accept "zilla://streams/net0" + option zilla:window ${networkAcceptWindow} + option zilla:transmission "duplex" + option zilla:byteorder "network" + +accepted + +connected + +read 26 # size + 3s # metadata + 5s # v5 + (int:requestId) + 5s "zilla" # client id + 1 # topics + 4s "test" # "test" topic + [0x00] # allow_auto_topic_creation + +write 97 # size + ${requestId} + 0 + 1 # brokers + 0xb1 # broker id + 19s "broker1.example.com" # host name + 9092 # port + -1s # no rack + 9s "cluster 1" # cluster id + 1 # controller id + 1 # topics + 0s # no error + 4s "test" # "test" topic + [0x00] # not internal + 1 # partitions + 0s # no error + 0 # partition + 0xb1 # leader + 0 # no replicas + -1 # no in-sync replicas + 0 # offline replicas + +accepted + +read zilla:begin.ext ${proxy:matchBeginEx() + .typeId(zilla:id("proxy")) + .addressInet() + .protocol("stream") + .source("0.0.0.0") + .destination("broker1.example.com") + .sourcePort(0) + .destinationPort(9092) + .build() + .info() + .authority("broker1.example.com") + .build() + .build()} + +connected + +read 113 + 0s + 3s + (int:requestId) + 5s "zilla" # client id + -1s + [0..2] + [0..4] + 1 + 4s "test" + 1 + 0 + 68 # record set size + 0L # first offset + 56 # length + -1 + [0x02] + [0..4] + 0s + 0 # last offset delta + (long:timestamp) # first timestamp + ${timestamp} # last timestamp + -1L + -1s + -1 + 1 # records + ${kafka:varint(6)} + [0x00] + ${kafka:varint(0)} + ${kafka:varint(0)} + ${kafka:varint(-1)} + ${kafka:varint(-1)} + ${kafka:varint(0)} # headers + +write 44 + ${requestId} + 1 # topics + 4s "test" + 1 # partitions + 0 # partition 0 + 53s # transactional-id-authorization-failed + -1L # base offset + 0L # log append time + 0 # throttle diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/GroupIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/GroupIT.java index a6b05d5c42..a18f0692f5 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/GroupIT.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/GroupIT.java @@ -216,4 +216,13 @@ public void shouldHandleServerSentReadAbortAfterJoinGroup() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${app}/broker.connection.error/client", + "${app}/broker.connection.error/server"}) + public void shouldHandleBrokerConnectionError() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/MetaIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/MetaIT.java index 138638923c..dd0efafb61 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/MetaIT.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/MetaIT.java @@ -93,4 +93,13 @@ public void shouldReceiveTopicPartitionInfoChanged() throws Exception k3po.notifyBarrier("SEND_SECOND_META"); k3po.finish(); } + + @Test + @Specification({ + "${app}/unsupported.sasl.mechanism/client", + "${app}/unsupported.sasl.mechanism/server"}) + public void shouldHandleUnsupportedSaslMechanism() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/OffsetCommitIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/OffsetCommitIT.java index dbcc40bffe..378723d0a7 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/OffsetCommitIT.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/OffsetCommitIT.java @@ -63,4 +63,13 @@ public void shouldRejectUnknownTopicPartitionOffset() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${app}/offset.commit.error/client", + "${app}/offset.commit.error/server"}) + public void shouldHandleOffsetCommitError() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/ProduceIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/ProduceIT.java index da8a5a48b8..1725924ef8 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/ProduceIT.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/application/ProduceIT.java @@ -382,4 +382,22 @@ public void shouldRejectMessageValues() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${app}/message.too.large/client", + "${app}/message.too.large/server"}) + public void shouldRejectMessageTooLarge() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${app}/transaction.id.authorization.error/client", + "${app}/transaction.id.authorization.error/server"}) + public void shouldRejectTransactionalIdAuthorizationError() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/GroupIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/GroupIT.java index df5b80d10a..71c20ef79f 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/GroupIT.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/GroupIT.java @@ -216,4 +216,13 @@ public void shouldCreateConnectionForJoinGroup() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${net}/broker.connection.error/client", + "${net}/broker.connection.error/server"}) + public void shouldHandleBrokerConnectionError() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/MetadataSaslIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/MetadataSaslIT.java index ac4be2154f..76ffcd4668 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/MetadataSaslIT.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/MetadataSaslIT.java @@ -54,4 +54,13 @@ public void shouldRequestTopicPartitionWithSaslScram() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${net}/unsupported.sasl.mechanism/client", + "${net}/unsupported.sasl.mechanism/server"}) + public void shouldHandleUnsupportedSaslMechanism() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/OffsetCommitIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/OffsetCommitIT.java index bf890953d2..92b16dfb4c 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/OffsetCommitIT.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/OffsetCommitIT.java @@ -63,4 +63,13 @@ public void shouldRejectUnknownTopicPartitionOffset() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${net}/offset.commit.error/client", + "${net}/offset.commit.error/server"}) + public void shouldHandleOffsetCommitError() throws Exception + { + k3po.finish(); + } } diff --git a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/ProduceIT.java b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/ProduceIT.java index ffb40030bc..964938c926 100644 --- a/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/ProduceIT.java +++ b/specs/binding-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/kafka/streams/network/ProduceIT.java @@ -301,4 +301,22 @@ public void shouldSendMessageHeadersRepeated() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${net}/message.too.large/client", + "${net}/message.too.large/server"}) + public void shouldRejectMessageTooLarge() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${net}/transaction.id.authorization.error/client", + "${net}/transaction.id.authorization.error/server"}) + public void shouldRejectTransactionalIdAuthorizationError() throws Exception + { + k3po.finish(); + } }