Skip to content

Commit 9b2aef8

Browse files
jfallowsclaude
andauthored
refactor(binding-mqtt): move session-ownership store option and warning from mqtt-kafka (#1804)
MQTT session ownership is a generic MQTT concern, so the store option that will gate it belongs on the mqtt server binding rather than mqtt-kafka. Move the 'store' option (config, builder, adapter, schema) and the "store will be required" deprecation warning to binding-mqtt, where the warning is logged once per worker for server-kind bindings that have no store configured. Remove the option and warning from mqtt-kafka. The ownership logic migration follows separately. Co-authored-by: Claude <noreply@anthropic.com>
1 parent e53a358 commit 9b2aef8

16 files changed

Lines changed: 96 additions & 135 deletions

File tree

runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/config/MqttKafkaOptionsConfig.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ public class MqttKafkaOptionsConfig extends OptionsConfig
2323
{
2424
public final MqttKafkaTopicsConfig topics;
2525
public final String serverRef;
26-
public final String store;
2726
public final List<String> clients;
2827
public final MqttKafkaPublishConfig publish;
2928

@@ -41,13 +40,11 @@ public static <T> MqttKafkaOptionsConfigBuilder<T> builder(
4140
MqttKafkaOptionsConfig(
4241
MqttKafkaTopicsConfig topics,
4342
String serverRef,
44-
String store,
4543
List<String> clients,
4644
MqttKafkaPublishConfig publish)
4745
{
4846
this.topics = topics;
4947
this.serverRef = serverRef;
50-
this.store = store;
5148
this.clients = clients;
5249
this.publish = publish;
5350
}

runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/config/MqttKafkaOptionsConfigBuilder.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ public class MqttKafkaOptionsConfigBuilder<T> extends ConfigBuilder<T, MqttKafka
2626

2727
private MqttKafkaTopicsConfig topics;
2828
private String serverRef;
29-
private String store;
3029
private List<String> clients;
3130
private MqttKafkaPublishConfig publish;
3231

@@ -63,13 +62,6 @@ public MqttKafkaOptionsConfigBuilder<T> serverRef(
6362
return this;
6463
}
6564

66-
public MqttKafkaOptionsConfigBuilder<T> store(
67-
String store)
68-
{
69-
this.store = store;
70-
return this;
71-
}
72-
7365
public MqttKafkaOptionsConfigBuilder<T> clients(
7466
List<String> clients)
7567
{
@@ -93,6 +85,6 @@ public MqttKafkaPublishConfigBuilder<MqttKafkaOptionsConfigBuilder<T>> publish()
9385
@Override
9486
public T build()
9587
{
96-
return mapper.apply(new MqttKafkaOptionsConfig(topics, serverRef, store, clients, publish));
88+
return mapper.apply(new MqttKafkaOptionsConfig(topics, serverRef, clients, publish));
9789
}
9890
}

runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapter.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public class MqttKafkaOptionsConfigAdapter implements OptionsConfigAdapterSpi, J
3838
{
3939
private static final String TOPICS_NAME = "topics";
4040
private static final String SERVER_NAME = "server";
41-
private static final String STORE_NAME = "store";
4241
private static final String CLIENTS_NAME = "clients";
4342
private static final String SESSIONS_NAME = "sessions";
4443
private static final String MESSAGES_NAME = "messages";
@@ -67,18 +66,13 @@ public JsonObject adaptToJson(
6766
JsonObjectBuilder object = Json.createObjectBuilder();
6867

6968
String serverRef = mqttKafkaOptions.serverRef;
70-
String store = mqttKafkaOptions.store;
7169
MqttKafkaTopicsConfig topics = mqttKafkaOptions.topics;
7270
List<String> clients = mqttKafkaOptions.clients;
7371

7472
if (serverRef != null)
7573
{
7674
object.add(SERVER_NAME, serverRef);
7775
}
78-
if (store != null)
79-
{
80-
object.add(STORE_NAME, store);
81-
}
8276
if (topics != null)
8377
{
8478
JsonObjectBuilder newTopics = Json.createObjectBuilder();
@@ -119,7 +113,6 @@ public OptionsConfig adaptFromJson(
119113
MqttKafkaOptionsConfigBuilder<MqttKafkaOptionsConfig> options = MqttKafkaOptionsConfig.builder();
120114
JsonObject topics = object.getJsonObject(TOPICS_NAME);
121115
options.serverRef(object.getString(SERVER_NAME, null));
122-
options.store(object.getString(STORE_NAME, null));
123116
JsonArray clientsJson = object.getJsonArray(CLIENTS_NAME);
124117
JsonObject publish = object.getJsonObject(PUBLISH_NAME);
125118

runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -377,11 +377,6 @@ public void onAttached(
377377
this.groupIdPrefix =
378378
String.format(groupIdPrefixFormat, supplyNamespace.apply(bindingId), supplyLocalName.apply(bindingId));
379379

380-
if (binding.options.store == null && coreIndex == 0)
381-
{
382-
warnSessionOwnershipDeprecated(supplyLocalName.apply(bindingId));
383-
}
384-
385380
if (willAvailable && coreIndex == 0)
386381
{
387382
Optional<MqttKafkaRouteConfig> route = binding.routes.stream().findFirst();
@@ -394,25 +389,6 @@ public void onAttached(
394389
sessionIds.put(bindingId, supplySessionId.get());
395390
}
396391

397-
private void warnSessionOwnershipDeprecated(
398-
String bindingName)
399-
{
400-
System.out.printf(
401-
"WARN [%s] Session ownership coordination via Kafka consumer group is deprecated. " +
402-
"The 'store' option will be required in an upcoming release. " +
403-
"To prepare, configure a store reference on this binding:%n" +
404-
"%n" +
405-
" stores:%n" +
406-
" memory0:%n" +
407-
" type: memory%n" +
408-
"%n" +
409-
" bindings:%n" +
410-
" %s:%n" +
411-
" options:%n" +
412-
" store: memory0%n",
413-
bindingName, bindingName);
414-
}
415-
416392
@Override
417393
public void onDetached(
418394
long bindingId)

runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapterTest.java

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -109,53 +109,6 @@ public void shouldWriteOptions()
109109
"}"));
110110
}
111111

112-
@Test
113-
public void shouldReadOptionsWithStore()
114-
{
115-
String text =
116-
"{" +
117-
"\"store\":\"memory0\"," +
118-
"\"topics\":" +
119-
"{" +
120-
"\"sessions\":\"sessions\"," +
121-
"\"messages\":\"messages\"," +
122-
"\"retained\":\"retained\"" +
123-
"}" +
124-
"}";
125-
126-
MqttKafkaOptionsConfig options = jsonb.fromJson(text, MqttKafkaOptionsConfig.class);
127-
128-
assertThat(options, not(nullValue()));
129-
assertThat(options.store, equalTo("memory0"));
130-
}
131-
132-
@Test
133-
public void shouldWriteOptionsWithStore()
134-
{
135-
MqttKafkaOptionsConfig options = MqttKafkaOptionsConfig.builder()
136-
.topics(MqttKafkaTopicsConfig.builder()
137-
.sessions("sessions")
138-
.messages("messages")
139-
.retained("retained")
140-
.build())
141-
.store("memory0")
142-
.build();
143-
144-
String text = jsonb.toJson(options);
145-
146-
assertThat(text, not(nullValue()));
147-
assertThat(text, equalTo(
148-
"{" +
149-
"\"store\":\"memory0\"," +
150-
"\"topics\":" +
151-
"{" +
152-
"\"sessions\":\"sessions\"," +
153-
"\"messages\":\"messages\"," +
154-
"\"retained\":\"retained\"" +
155-
"}" +
156-
"}"));
157-
}
158-
159112
@Test
160113
public void shouldReadOptionsWithoutClients()
161114
{

runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttOptionsConfig.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class MqttOptionsConfig extends OptionsConfig
3434
public final MqttAuthorizationConfig authorization;
3535
public final List<MqttTopicConfig> topics;
3636
public final List<MqttVersion> versions;
37+
public final String store;
3738

3839
public static MqttOptionsConfigBuilder<MqttOptionsConfig> builder()
3940
{
@@ -49,12 +50,14 @@ public static <T> MqttOptionsConfigBuilder<T> builder(
4950
public MqttOptionsConfig(
5051
MqttAuthorizationConfig authorization,
5152
List<MqttTopicConfig> topics,
52-
List<MqttVersion> versions)
53+
List<MqttVersion> versions,
54+
String store)
5355
{
5456
super(resolveModels(topics), List.of());
5557
this.authorization = authorization;
5658
this.topics = topics;
5759
this.versions = versions;
60+
this.store = store;
5861
}
5962

6063
private static List<ModelConfig> resolveModels(

runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/config/MqttOptionsConfigBuilder.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class MqttOptionsConfigBuilder<T> extends ConfigBuilder<T, MqttOptionsCon
3030
private MqttAuthorizationConfig authorization;
3131
private List<MqttTopicConfig> topics;
3232
private List<MqttVersion> versions;
33+
private String store;
3334

3435
MqttOptionsConfigBuilder(
3536
Function<OptionsConfig, T> mapper)
@@ -106,9 +107,16 @@ public MqttAuthorizationConfigBuilder<MqttOptionsConfigBuilder<T>> authorization
106107
return new MqttAuthorizationConfigBuilder<>(this::authorization);
107108
}
108109

110+
public MqttOptionsConfigBuilder<T> store(
111+
String store)
112+
{
113+
this.store = store;
114+
return this;
115+
}
116+
109117
@Override
110118
public T build()
111119
{
112-
return mapper.apply(new MqttOptionsConfig(authorization, topics, versions));
120+
return mapper.apply(new MqttOptionsConfig(authorization, topics, versions, store));
113121
}
114122
}

runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttOptionsConfigAdapter.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class MqttOptionsConfigAdapter implements OptionsConfigAdapterSpi, JsonbA
4444
private static final String AUTHORIZATION_CREDENTIALS_CONNECT_NAME = "connect";
4545
private static final String TOPICS_NAME = "topics";
4646
private static final String VERSIONS_NAME = "versions";
47+
private static final String STORE_NAME = "store";
4748

4849
private final MqttTopicConfigAdapter mqttTopic = new MqttTopicConfigAdapter();
4950

@@ -112,6 +113,12 @@ public JsonObject adaptToJson(
112113
object.add(VERSIONS_NAME, versions);
113114
}
114115

116+
String store = mqttOptions.store;
117+
if (store != null)
118+
{
119+
object.add(STORE_NAME, store);
120+
}
121+
115122
return object.build();
116123
}
117124

@@ -169,6 +176,11 @@ public OptionsConfig adaptFromJson(
169176
mqttOptions.versions(versions);
170177
}
171178

179+
if (object.containsKey(STORE_NAME))
180+
{
181+
mqttOptions.store(object.getString(STORE_NAME));
182+
}
183+
172184
return mqttOptions.build();
173185
}
174186
}

runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@
193193
import io.aklivity.zilla.runtime.engine.buffer.BufferPool;
194194
import io.aklivity.zilla.runtime.engine.concurrent.Signaler;
195195
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
196+
import io.aklivity.zilla.runtime.engine.config.KindConfig;
196197
import io.aklivity.zilla.runtime.engine.config.ModelConfig;
197198
import io.aklivity.zilla.runtime.engine.config.WithConfig;
198199
import io.aklivity.zilla.runtime.engine.guard.GuardHandler;
@@ -541,6 +542,13 @@ public void attach(
541542
{
542543
MqttBindingConfig mqttBinding = new MqttBindingConfig(binding, context);
543544
bindings.put(binding.id, mqttBinding);
545+
546+
if (mqttBinding.kind == KindConfig.SERVER &&
547+
context.index() == 0 &&
548+
(mqttBinding.options == null || mqttBinding.options.store == null))
549+
{
550+
warnSessionOwnershipDeprecated(mqttBinding.name);
551+
}
544552
}
545553

546554
@Override
@@ -550,6 +558,24 @@ public void detach(
550558
bindings.remove(bindingId);
551559
}
552560

561+
private void warnSessionOwnershipDeprecated(
562+
String bindingName)
563+
{
564+
System.out.printf(
565+
"WARN [%s] MQTT session ownership will require the 'store' option in an upcoming release. " +
566+
"To prepare, configure a store reference on this binding:%n" +
567+
"%n" +
568+
" stores:%n" +
569+
" memory0:%n" +
570+
" type: memory%n" +
571+
"%n" +
572+
" bindings:%n" +
573+
" %s:%n" +
574+
" options:%n" +
575+
" store: memory0%n",
576+
bindingName, bindingName);
577+
}
578+
553579
@Override
554580
public MessageConsumer newStream(
555581
int msgTypeId,

runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/config/MqttOptionsConfigAdapterTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public void shouldReadOptions()
6464
"v3.1.1," +
6565
"v5" +
6666
"]," +
67+
"\"store\":\"memory0\"," +
6768
"\"authorization\":" +
6869
"{" +
6970
"\"test0\":" +
@@ -114,6 +115,7 @@ public void shouldReadOptions()
114115
assertThat(userProperty.value.model, equalTo("test"));
115116
assertThat(options.versions.get(0), equalTo(MqttVersion.V3_1_1));
116117
assertThat(options.versions.get(1), equalTo(MqttVersion.V_5));
118+
assertThat(options.store, equalTo("memory0"));
117119
}
118120

119121
@Test
@@ -140,7 +142,7 @@ public void shouldWriteOptions()
140142
singletonList(new MqttPatternConfig(
141143
MqttPatternConfig.MqttConnectProperty.USERNAME,
142144
"Bearer {credentials}")))),
143-
topics, versions);
145+
topics, versions, "memory0");
144146

145147
String text = jsonb.toJson(options);
146148

@@ -175,7 +177,8 @@ public void shouldWriteOptions()
175177
"[" +
176178
"\"v3.1.1\"," +
177179
"\"v5\"" +
178-
"]" +
180+
"]," +
181+
"\"store\":\"memory0\"" +
179182
"}"));
180183
}
181184
}

0 commit comments

Comments
 (0)