Skip to content

Commit e53a358

Browse files
jfallowsclaude
andauthored
feat(binding-mqtt-kafka): deprecate Kafka-group session ownership (#1797) (#1802)
Add an optional, currently-unused `store` field to the mqtt-kafka binding options so configurations can prepare for StoreHandler-based session ownership coordination. When `store` is absent, log a one-time deprecation warning per binding instance announcing the upcoming requirement. Existing highlander-group ownership behavior is unchanged. https://claude.ai/code/session_0186rZE8mW1U947XTN4kzZ1x Co-authored-by: Claude <noreply@anthropic.com>
1 parent e1a923d commit e53a358

8 files changed

Lines changed: 131 additions & 1 deletion

File tree

runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/config/MqttKafkaOptionsConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public class MqttKafkaOptionsConfig extends OptionsConfig
2323
{
2424
public final MqttKafkaTopicsConfig topics;
2525
public final String serverRef;
26+
public final String store;
2627
public final List<String> clients;
2728
public final MqttKafkaPublishConfig publish;
2829

@@ -40,11 +41,13 @@ public static <T> MqttKafkaOptionsConfigBuilder<T> builder(
4041
MqttKafkaOptionsConfig(
4142
MqttKafkaTopicsConfig topics,
4243
String serverRef,
44+
String store,
4345
List<String> clients,
4446
MqttKafkaPublishConfig publish)
4547
{
4648
this.topics = topics;
4749
this.serverRef = serverRef;
50+
this.store = store;
4851
this.clients = clients;
4952
this.publish = publish;
5053
}

runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/config/MqttKafkaOptionsConfigBuilder.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class MqttKafkaOptionsConfigBuilder<T> extends ConfigBuilder<T, MqttKafka
2626

2727
private MqttKafkaTopicsConfig topics;
2828
private String serverRef;
29+
private String store;
2930
private List<String> clients;
3031
private MqttKafkaPublishConfig publish;
3132

@@ -62,6 +63,13 @@ public MqttKafkaOptionsConfigBuilder<T> serverRef(
6263
return this;
6364
}
6465

66+
public MqttKafkaOptionsConfigBuilder<T> store(
67+
String store)
68+
{
69+
this.store = store;
70+
return this;
71+
}
72+
6573
public MqttKafkaOptionsConfigBuilder<T> clients(
6674
List<String> clients)
6775
{
@@ -85,6 +93,6 @@ public MqttKafkaPublishConfigBuilder<MqttKafkaOptionsConfigBuilder<T>> publish()
8593
@Override
8694
public T build()
8795
{
88-
return mapper.apply(new MqttKafkaOptionsConfig(topics, serverRef, clients, publish));
96+
return mapper.apply(new MqttKafkaOptionsConfig(topics, serverRef, store, clients, publish));
8997
}
9098
}

runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class MqttKafkaOptionsConfigAdapter implements OptionsConfigAdapterSpi, J
3838
{
3939
private static final String TOPICS_NAME = "topics";
4040
private static final String SERVER_NAME = "server";
41+
private static final String STORE_NAME = "store";
4142
private static final String CLIENTS_NAME = "clients";
4243
private static final String SESSIONS_NAME = "sessions";
4344
private static final String MESSAGES_NAME = "messages";
@@ -66,13 +67,18 @@ public JsonObject adaptToJson(
6667
JsonObjectBuilder object = Json.createObjectBuilder();
6768

6869
String serverRef = mqttKafkaOptions.serverRef;
70+
String store = mqttKafkaOptions.store;
6971
MqttKafkaTopicsConfig topics = mqttKafkaOptions.topics;
7072
List<String> clients = mqttKafkaOptions.clients;
7173

7274
if (serverRef != null)
7375
{
7476
object.add(SERVER_NAME, serverRef);
7577
}
78+
if (store != null)
79+
{
80+
object.add(STORE_NAME, store);
81+
}
7682
if (topics != null)
7783
{
7884
JsonObjectBuilder newTopics = Json.createObjectBuilder();
@@ -113,6 +119,7 @@ public OptionsConfig adaptFromJson(
113119
MqttKafkaOptionsConfigBuilder<MqttKafkaOptionsConfig> options = MqttKafkaOptionsConfig.builder();
114120
JsonObject topics = object.getJsonObject(TOPICS_NAME);
115121
options.serverRef(object.getString(SERVER_NAME, null));
122+
options.store(object.getString(STORE_NAME, null));
116123
JsonArray clientsJson = object.getJsonArray(CLIENTS_NAME);
117124
JsonObject publish = object.getJsonObject(PUBLISH_NAME);
118125

runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,11 @@ public void onAttached(
377377
this.groupIdPrefix =
378378
String.format(groupIdPrefixFormat, supplyNamespace.apply(bindingId), supplyLocalName.apply(bindingId));
379379

380+
if (binding.options.store == null && coreIndex == 0)
381+
{
382+
warnSessionOwnershipDeprecated(supplyLocalName.apply(bindingId));
383+
}
384+
380385
if (willAvailable && coreIndex == 0)
381386
{
382387
Optional<MqttKafkaRouteConfig> route = binding.routes.stream().findFirst();
@@ -389,6 +394,25 @@ public void onAttached(
389394
sessionIds.put(bindingId, supplySessionId.get());
390395
}
391396

397+
private void warnSessionOwnershipDeprecated(
398+
String bindingName)
399+
{
400+
System.out.printf(
401+
"WARN [%s] Session ownership coordination via Kafka consumer group is deprecated. " +
402+
"The 'store' option will be required in an upcoming release. " +
403+
"To prepare, configure a store reference on this binding:%n" +
404+
"%n" +
405+
" stores:%n" +
406+
" memory0:%n" +
407+
" type: memory%n" +
408+
"%n" +
409+
" bindings:%n" +
410+
" %s:%n" +
411+
" options:%n" +
412+
" store: memory0%n",
413+
bindingName, bindingName);
414+
}
415+
392416
@Override
393417
public void onDetached(
394418
long bindingId)

runtime/binding-mqtt-kafka/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/config/MqttKafkaOptionsConfigAdapterTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,53 @@ public void shouldWriteOptions()
109109
"}"));
110110
}
111111

112+
@Test
113+
public void shouldReadOptionsWithStore()
114+
{
115+
String text =
116+
"{" +
117+
"\"store\":\"memory0\"," +
118+
"\"topics\":" +
119+
"{" +
120+
"\"sessions\":\"sessions\"," +
121+
"\"messages\":\"messages\"," +
122+
"\"retained\":\"retained\"" +
123+
"}" +
124+
"}";
125+
126+
MqttKafkaOptionsConfig options = jsonb.fromJson(text, MqttKafkaOptionsConfig.class);
127+
128+
assertThat(options, not(nullValue()));
129+
assertThat(options.store, equalTo("memory0"));
130+
}
131+
132+
@Test
133+
public void shouldWriteOptionsWithStore()
134+
{
135+
MqttKafkaOptionsConfig options = MqttKafkaOptionsConfig.builder()
136+
.topics(MqttKafkaTopicsConfig.builder()
137+
.sessions("sessions")
138+
.messages("messages")
139+
.retained("retained")
140+
.build())
141+
.store("memory0")
142+
.build();
143+
144+
String text = jsonb.toJson(options);
145+
146+
assertThat(text, not(nullValue()));
147+
assertThat(text, equalTo(
148+
"{" +
149+
"\"store\":\"memory0\"," +
150+
"\"topics\":" +
151+
"{" +
152+
"\"sessions\":\"sessions\"," +
153+
"\"messages\":\"messages\"," +
154+
"\"retained\":\"retained\"" +
155+
"}" +
156+
"}"));
157+
}
158+
112159
@Test
113160
public void shouldReadOptionsWithoutClients()
114161
{
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#
2+
# Copyright 2021-2024 Aklivity Inc
3+
#
4+
# Licensed under the Aklivity Community License (the "License"); you may not use
5+
# this file except in compliance with the License. You may obtain a copy of the
6+
# License at
7+
#
8+
# https://www.aklivity.io/aklivity-community-license/
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
# specific language governing permissions and limitations under the License.
14+
#
15+
16+
---
17+
name: test
18+
bindings:
19+
mqtt0:
20+
type: mqtt-kafka
21+
kind: proxy
22+
options:
23+
store: memory0
24+
topics:
25+
sessions: sessions
26+
messages: messages
27+
retained: retained
28+
exit: kafka0

specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/schema/mqtt.kafka.schema.patch.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@
4343
"title": "Server Reference",
4444
"type": "string"
4545
},
46+
"store":
47+
{
48+
"title": "Store Reference",
49+
"type": "string"
50+
},
4651
"topics":
4752
{
4853
"title": "Topics",

specs/binding-mqtt-kafka.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/kafka/config/SchemaTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ public void shouldValidateProxyWithOptions()
5050
assertThat(config, not(nullValue()));
5151
}
5252

53+
@Test
54+
public void shouldValidateProxyWithStore()
55+
{
56+
JsonObject config = schema.validate("proxy.store.yaml");
57+
58+
assertThat(config, not(nullValue()));
59+
}
60+
5361
@Test
5462
public void shouldValidateProxyWithPublishQosMax()
5563
{

0 commit comments

Comments
 (0)