diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/config/MqttKafkaOptionsConfig.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/config/MqttKafkaOptionsConfig.java index 42fc5e87aa..5ce049401e 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/config/MqttKafkaOptionsConfig.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/config/MqttKafkaOptionsConfig.java @@ -23,6 +23,7 @@ public class MqttKafkaOptionsConfig extends OptionsConfig { public final MqttKafkaTopicsConfig topics; public final String serverRef; + public final String store; public final List clients; public final MqttKafkaPublishConfig publish; @@ -40,11 +41,13 @@ public static MqttKafkaOptionsConfigBuilder builder( MqttKafkaOptionsConfig( MqttKafkaTopicsConfig topics, String serverRef, + String store, List clients, MqttKafkaPublishConfig publish) { this.topics = topics; this.serverRef = serverRef; + this.store = store; this.clients = clients; this.publish = publish; } diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/config/MqttKafkaOptionsConfigBuilder.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/config/MqttKafkaOptionsConfigBuilder.java index eaf0b71959..6a0e1be8ea 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/config/MqttKafkaOptionsConfigBuilder.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/config/MqttKafkaOptionsConfigBuilder.java @@ -26,6 +26,7 @@ public class MqttKafkaOptionsConfigBuilder extends ConfigBuilder clients; private MqttKafkaPublishConfig publish; @@ -62,6 +63,13 @@ public MqttKafkaOptionsConfigBuilder serverRef( return this; } + public MqttKafkaOptionsConfigBuilder store( + String store) + { + this.store = store; + return this; + } + public MqttKafkaOptionsConfigBuilder clients( List clients) { @@ -85,6 +93,6 @@ public MqttKafkaPublishConfigBuilder> publish() @Override public T build() { - return mapper.apply(new MqttKafkaOptionsConfig(topics, serverRef, clients, publish)); + return mapper.apply(new MqttKafkaOptionsConfig(topics, serverRef, store, clients, publish)); } } diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapter.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapter.java index ad0b0216e6..b7d64bcf0e 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapter.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapter.java @@ -38,6 +38,7 @@ public class MqttKafkaOptionsConfigAdapter implements OptionsConfigAdapterSpi, J { private static final String TOPICS_NAME = "topics"; private static final String SERVER_NAME = "server"; + private static final String STORE_NAME = "store"; private static final String CLIENTS_NAME = "clients"; private static final String SESSIONS_NAME = "sessions"; private static final String MESSAGES_NAME = "messages"; @@ -66,6 +67,7 @@ public JsonObject adaptToJson( JsonObjectBuilder object = Json.createObjectBuilder(); String serverRef = mqttKafkaOptions.serverRef; + String store = mqttKafkaOptions.store; MqttKafkaTopicsConfig topics = mqttKafkaOptions.topics; List clients = mqttKafkaOptions.clients; @@ -73,6 +75,10 @@ public JsonObject adaptToJson( { object.add(SERVER_NAME, serverRef); } + if (store != null) + { + object.add(STORE_NAME, store); + } if (topics != null) { JsonObjectBuilder newTopics = Json.createObjectBuilder(); @@ -113,6 +119,7 @@ public OptionsConfig adaptFromJson( MqttKafkaOptionsConfigBuilder options = MqttKafkaOptionsConfig.builder(); JsonObject topics = object.getJsonObject(TOPICS_NAME); options.serverRef(object.getString(SERVER_NAME, null)); + options.store(object.getString(STORE_NAME, null)); JsonArray clientsJson = object.getJsonArray(CLIENTS_NAME); JsonObject publish = object.getJsonObject(PUBLISH_NAME); diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java index f1cae2f4c3..6911e152c5 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java @@ -377,6 +377,11 @@ public void onAttached( this.groupIdPrefix = String.format(groupIdPrefixFormat, supplyNamespace.apply(bindingId), supplyLocalName.apply(bindingId)); + if (binding.options.store == null && coreIndex == 0) + { + warnSessionOwnershipDeprecated(supplyLocalName.apply(bindingId)); + } + if (willAvailable && coreIndex == 0) { Optional route = binding.routes.stream().findFirst(); @@ -389,6 +394,25 @@ public void onAttached( sessionIds.put(bindingId, supplySessionId.get()); } + private void warnSessionOwnershipDeprecated( + String bindingName) + { + System.out.printf( + "WARN [%s] Session ownership coordination via Kafka consumer group is deprecated. " + + "The 'store' option will be required in an upcoming release. " + + "To prepare, configure a store reference on this binding:%n" + + "%n" + + " stores:%n" + + " memory0:%n" + + " type: memory%n" + + "%n" + + " bindings:%n" + + " %s:%n" + + " options:%n" + + " store: memory0%n", + bindingName, bindingName); + } + @Override public void onDetached( long bindingId) diff --git a/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapterTest.java b/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapterTest.java index ca79deefe0..f1bf298d7b 100644 --- a/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapterTest.java +++ b/runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapterTest.java @@ -109,6 +109,53 @@ public void shouldWriteOptions() "}")); } + @Test + public void shouldReadOptionsWithStore() + { + String text = + "{" + + "\"store\":\"memory0\"," + + "\"topics\":" + + "{" + + "\"sessions\":\"sessions\"," + + "\"messages\":\"messages\"," + + "\"retained\":\"retained\"" + + "}" + + "}"; + + MqttKafkaOptionsConfig options = jsonb.fromJson(text, MqttKafkaOptionsConfig.class); + + assertThat(options, not(nullValue())); + assertThat(options.store, equalTo("memory0")); + } + + @Test + public void shouldWriteOptionsWithStore() + { + MqttKafkaOptionsConfig options = MqttKafkaOptionsConfig.builder() + .topics(MqttKafkaTopicsConfig.builder() + .sessions("sessions") + .messages("messages") + .retained("retained") + .build()) + .store("memory0") + .build(); + + String text = jsonb.toJson(options); + + assertThat(text, not(nullValue())); + assertThat(text, equalTo( + "{" + + "\"store\":\"memory0\"," + + "\"topics\":" + + "{" + + "\"sessions\":\"sessions\"," + + "\"messages\":\"messages\"," + + "\"retained\":\"retained\"" + + "}" + + "}")); + } + @Test public void shouldReadOptionsWithoutClients() { diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/config/proxy.store.yaml b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/config/proxy.store.yaml new file mode 100644 index 0000000000..ed42c11502 --- /dev/null +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/config/proxy.store.yaml @@ -0,0 +1,28 @@ +# +# Copyright 2021-2024 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +--- +name: test +bindings: + mqtt0: + type: mqtt-kafka + kind: proxy + options: + store: memory0 + topics: + sessions: sessions + messages: messages + retained: retained + exit: kafka0 diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/schema/mqtt.kafka.schema.patch.json b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/schema/mqtt.kafka.schema.patch.json index ebfdc6012b..cf0ab280f6 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/schema/mqtt.kafka.schema.patch.json +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/schema/mqtt.kafka.schema.patch.json @@ -43,6 +43,11 @@ "title": "Server Reference", "type": "string" }, + "store": + { + "title": "Store Reference", + "type": "string" + }, "topics": { "title": "Topics", diff --git a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/config/SchemaTest.java b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/config/SchemaTest.java index faef889bf8..42d2147802 100644 --- a/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/config/SchemaTest.java +++ b/specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/config/SchemaTest.java @@ -50,6 +50,14 @@ public void shouldValidateProxyWithOptions() assertThat(config, not(nullValue())); } + @Test + public void shouldValidateProxyWithStore() + { + JsonObject config = schema.validate("proxy.store.yaml"); + + assertThat(config, not(nullValue())); + } + @Test public void shouldValidateProxyWithPublishQosMax() {