Skip to content

Commit eb2ae15

Browse files
authored
Merge pull request #5 from danube-messaging/implement_key_shared
Implement key shared subscription
2 parents 150385c + ebe7ab1 commit eb2ae15

16 files changed

Lines changed: 687 additions & 15 deletions

File tree

README.md

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@ The Java client library for interacting with Danube Messaging Broker platform.
1515

1616
### 📥 Consumer Capabilities
1717

18-
- **Flexible Subscriptions** - Three subscription types for different use cases:
18+
- **Flexible Subscriptions** - Four subscription types for different use cases:
1919
- **Exclusive** - Single active consumer, guaranteed ordering
2020
- **Shared** - Load balancing across multiple consumers, parallel processing
2121
- **Failover** - High availability with automatic standby promotion
22+
- **Key-Shared** - Per-key ordering with multi-consumer parallelism; messages with the same routing key always go to the same consumer
23+
- **Key Filtering** - In Key-Shared mode, subscribe to a subset of routing keys with glob patterns
2224
- **Message Acknowledgment** - Reliable message processing with at-least-once delivery
2325
- **Partitioned Consumption** - Automatic handling of messages from all partitions
2426
- **Message Attributes** - Access metadata and custom headers
@@ -47,14 +49,14 @@ The Java client library for interacting with Danube Messaging Broker platform.
4749
<dependency>
4850
<groupId>com.danube-messaging</groupId>
4951
<artifactId>danube-client</artifactId>
50-
<version>0.2.0</version>
52+
<version>0.5.0</version>
5153
</dependency>
5254
```
5355

5456
### Gradle
5557

5658
```groovy
57-
implementation 'com.danube-messaging:danube-client:0.2.0'
59+
implementation 'com.danube-messaging:danube-client:0.5.0'
5860
```
5961

6062
**Requirements:** Java 21 or later.
@@ -110,6 +112,14 @@ Producer producer = client.newProducer()
110112
.build();
111113
```
112114

115+
### Key-Shared Routing
116+
117+
Tag messages with a routing key so all messages with the same key go to the same consumer:
118+
119+
```java
120+
producer.sendWithKey(payload, Map.of(), "order-123");
121+
```
122+
113123
### Create Consumer
114124

115125
```java
@@ -164,6 +174,21 @@ shutdown.await();
164174
client.close();
165175
```
166176

177+
### Key-Shared with Filtering
178+
179+
Subscribe to only specific routing keys in a Key-Shared subscription:
180+
181+
```java
182+
Consumer consumer = client.newConsumer()
183+
.withTopic(topic)
184+
.withConsumerName("payments-worker")
185+
.withSubscription("orders-sub")
186+
.withSubscriptionType(SubType.KEY_SHARED)
187+
.withKeyFilter("payment")
188+
.withKeyFilter("invoice")
189+
.build();
190+
```
191+
167192
### Schema Registry
168193

169194
```java
@@ -205,7 +230,13 @@ Producer producer = client.newProducer()
205230
producer.create();
206231
```
207232

208-
Browse the [examples directory](https://github.com/danube-messaging/danube-java/tree/main/examples) for complete working code.
233+
Browse the [examples directory](https://github.com/danube-messaging/danube-java/tree/main/examples) for complete working code:
234+
235+
- **[SimpleProducerConsumer](examples/SimpleProducerConsumer.java)** — basic send/receive
236+
- **[ReliableDispatch](examples/ReliableDispatchProducer.java)** — at-least-once delivery with acks
237+
- **[Partitions](examples/PartitionsProducer.java)** — partitioned topic
238+
- **[KeyShared](examples/KeySharedProducer.java)** — Key-Shared routing, filtering, and producer with routing keys
239+
- **[JsonProducer](examples/JsonProducer.java)** / **[SchemaEvolution](examples/SchemaEvolution.java)** — schema registry integration
209240

210241
## Contribution
211242

danube-client-proto/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>com.danube-messaging</groupId>
99
<artifactId>danube-java</artifactId>
10-
<version>0.4.0</version>
10+
<version>0.5.0</version>
1111
<relativePath>../pom.xml</relativePath>
1212
</parent>
1313

danube-client-proto/src/main/proto/DanubeApi.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,17 @@ message ConsumerRequest {
7474
Exclusive = 0; // Only one consumer can subscribe to the topic at a time.
7575
Shared = 1 ; // Multiple consumers can subscribe to the topic concurrently.
7676
Failover = 2; // Only one consumer (the active consumer) receives messages at any given time.
77+
KeyShared = 3; // Messages with same routing key always go to the same consumer, in order.
7778
}
7879
uint64 request_id = 1;
7980
string topic_name = 2;
8081
string consumer_name = 3;
8182
string subscription = 4;
8283
SubscriptionType subscription_type = 5;
84+
// Glob patterns for key-based filtering (optional, KeyShared only).
85+
// Examples: "user-*", "eu-west-?", "orders-premium-*"
86+
// Empty = accept all keys from hash ring assignment.
87+
repeated string key_filters = 6;
8388
}
8489

8590
// Create Consumer response
@@ -113,6 +118,9 @@ message StreamMessage {
113118
// NEW: Schema identification for registry-based schemas
114119
optional uint64 schema_id = 8; // Global schema ID from registry
115120
optional uint32 schema_version = 9; // Schema version number
121+
// Routing key for Key-Shared dispatch. Set by producer via send_with_key().
122+
// Carried through to consumers for application-level use.
123+
optional string routing_key = 10;
116124
}
117125

118126
// Unique ID of the message

danube-client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>com.danube-messaging</groupId>
99
<artifactId>danube-java</artifactId>
10-
<version>0.4.0</version>
10+
<version>0.5.0</version>
1111
<relativePath>../pom.xml</relativePath>
1212
</parent>
1313

danube-client/src/main/java/com/danubemessaging/client/ConsumerBuilder.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.danubemessaging.client;
22

33
import com.danubemessaging.client.errors.DanubeClientException;
4+
import java.util.ArrayList;
5+
import java.util.List;
46

57
/**
68
* Builder for {@link Consumer}.
@@ -12,6 +14,7 @@ public final class ConsumerBuilder {
1214
private String consumerName;
1315
private String subscription;
1416
private SubType subType = SubType.SHARED;
17+
private final List<String> keyFilters = new ArrayList<>();
1518
private ConsumerEventListener eventListener = ConsumerEventListener.noop();
1619
private int maxRetries;
1720
private long baseBackoffMs;
@@ -55,7 +58,8 @@ public ConsumerBuilder withSubscription(String subscription) {
5558
/**
5659
* Sets the subscription type. Defaults to {@link SubType#SHARED}.
5760
*
58-
* @param subType {@link SubType#EXCLUSIVE}, {@link SubType#SHARED}, or {@link SubType#FAILOVER}
61+
* @param subType {@link SubType#EXCLUSIVE}, {@link SubType#SHARED},
62+
* {@link SubType#FAILOVER}, or {@link SubType#KEY_SHARED}
5963
*/
6064
public ConsumerBuilder withSubscriptionType(SubType subType) {
6165
if (subType != null) {
@@ -64,6 +68,35 @@ public ConsumerBuilder withSubscriptionType(SubType subType) {
6468
return this;
6569
}
6670

71+
/**
72+
* Adds a single key filter pattern for {@link SubType#KEY_SHARED} subscriptions.
73+
* Uses glob syntax: {@code "user-*"}, {@code "eu-west-?"}, {@code "*"}.
74+
*
75+
* @param pattern the key filter pattern
76+
*/
77+
public ConsumerBuilder withKeyFilter(String pattern) {
78+
if (pattern != null && !pattern.isBlank()) {
79+
this.keyFilters.add(pattern);
80+
}
81+
return this;
82+
}
83+
84+
/**
85+
* Adds multiple key filter patterns for {@link SubType#KEY_SHARED} subscriptions.
86+
*
87+
* @param patterns the key filter patterns
88+
*/
89+
public ConsumerBuilder withKeyFilters(List<String> patterns) {
90+
if (patterns != null) {
91+
for (String p : patterns) {
92+
if (p != null && !p.isBlank()) {
93+
this.keyFilters.add(p);
94+
}
95+
}
96+
}
97+
return this;
98+
}
99+
67100
/**
68101
* Sets a listener for consumer lifecycle and message events.
69102
*
@@ -120,7 +153,9 @@ public Consumer build() {
120153
}
121154

122155
ConsumerOptions options = new ConsumerOptions(
123-
topic, consumerName, subscription, subType, eventListener,
156+
topic, consumerName, subscription, subType,
157+
List.copyOf(keyFilters),
158+
eventListener,
124159
maxRetries, baseBackoffMs, maxBackoffMs);
125160
return new Consumer(client, options);
126161
}

danube-client/src/main/java/com/danubemessaging/client/ConsumerOptions.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.danubemessaging.client;
22

3+
import java.util.List;
4+
35
/**
46
* Immutable consumer configuration.
57
*/
@@ -8,6 +10,7 @@ public record ConsumerOptions(
810
String consumerName,
911
String subscription,
1012
SubType subType,
13+
List<String> keyFilters,
1114
ConsumerEventListener eventListener,
1215
int maxRetries,
1316
long baseBackoffMs,

danube-client/src/main/java/com/danubemessaging/client/Producer.java

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,18 +144,56 @@ public CompletableFuture<Long> sendAsync(byte[] payload, Map<String, String> att
144144
* @throws com.danubemessaging.client.errors.DanubeClientException on unrecoverable error
145145
*/
146146
public long send(byte[] payload, Map<String, String> attributes) {
147+
return sendInternal(payload, attributes, null, selectTopicProducer());
148+
}
149+
150+
/**
151+
* Sends a message with a routing key asynchronously for KEY_SHARED subscriptions.
152+
*
153+
* @param payload message body
154+
* @param attributes optional metadata
155+
* @param routingKey the routing key; all messages with the same key go to the same consumer
156+
* @return a future resolving to the broker-assigned message sequence ID
157+
*/
158+
public CompletableFuture<Long> sendWithKeyAsync(byte[] payload, Map<String, String> attributes,
159+
String routingKey) {
160+
byte[] payloadCopy = payload == null ? new byte[0] : payload.clone();
161+
Map<String, String> attr = attributes == null ? Map.of() : Map.copyOf(attributes);
162+
return CompletableFuture.supplyAsync(() -> sendWithKey(payloadCopy, attr, routingKey), client.ioExecutor());
163+
}
164+
165+
/**
166+
* Sends a message with a routing key for KEY_SHARED subscriptions.
167+
*
168+
* <p>For partitioned topics, hashes the routing key to a specific partition ensuring
169+
* all messages with the same key always go to the same partition. For non-partitioned
170+
* topics, simply tags the routing key on the message.
171+
*
172+
* @param payload message body
173+
* @param attributes optional metadata; pass {@code Map.of()} for none
174+
* @param routingKey the routing key; must not be null
175+
* @return the broker-assigned message sequence ID
176+
* @throws com.danubemessaging.client.errors.DanubeClientException on unrecoverable error
177+
*/
178+
public long sendWithKey(byte[] payload, Map<String, String> attributes, String routingKey) {
179+
ensureOpen();
180+
TopicProducer topicProducer = selectTopicProducerForKey(routingKey);
181+
return sendInternal(payload, attributes, routingKey, topicProducer);
182+
}
183+
184+
private long sendInternal(byte[] payload, Map<String, String> attributes,
185+
String routingKey, TopicProducer topicProducer) {
147186
ensureOpen();
148187

149188
if (lifecycleState.get() != LifecycleState.CREATED) {
150189
create();
151190
}
152191

153-
TopicProducer topicProducer = selectTopicProducer();
154192
int attempts = 0;
155193

156194
while (true) {
157195
try {
158-
return topicProducer.send(payload, attributes);
196+
return topicProducer.send(payload, attributes, routingKey);
159197
} catch (RuntimeException error) {
160198
boolean unrecoverable = client.retryManager().isUnrecoverable(error);
161199
if (unrecoverable) {
@@ -227,6 +265,32 @@ private TopicProducer selectTopicProducer() {
227265
return topicProducers.get(index);
228266
}
229267

268+
private TopicProducer selectTopicProducerForKey(String routingKey) {
269+
if (topicProducers.isEmpty()) {
270+
throw new DanubeClientException("Producer is not initialized");
271+
}
272+
273+
if (topicProducers.size() == 1) {
274+
return topicProducers.get(0);
275+
}
276+
277+
int index = Math.floorMod((int) fnv1aHash(routingKey), topicProducers.size());
278+
return topicProducers.get(index);
279+
}
280+
281+
/**
282+
* FNV-1a 64-bit hash — must match Rust/Go/Python constants.
283+
*/
284+
static long fnv1aHash(String key) {
285+
long hash = 0xcbf29ce484222325L;
286+
byte[] bytes = key.getBytes(java.nio.charset.StandardCharsets.UTF_8);
287+
for (byte b : bytes) {
288+
hash ^= (b & 0xFF);
289+
hash *= 0x100000001b3L;
290+
}
291+
return hash;
292+
}
293+
230294
private boolean hasCustomRetryOptions() {
231295
return options.maxRetries() > 0 || options.baseBackoffMs() > 0 || options.maxBackoffMs() > 0;
232296
}

danube-client/src/main/java/com/danubemessaging/client/SubType.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88
public enum SubType {
99
EXCLUSIVE,
1010
SHARED,
11-
FAILOVER;
11+
FAILOVER,
12+
KEY_SHARED;
1213

1314
public DanubeApi.ConsumerRequest.SubscriptionType toProto() {
1415
return switch (this) {
1516
case EXCLUSIVE -> DanubeApi.ConsumerRequest.SubscriptionType.Exclusive;
1617
case SHARED -> DanubeApi.ConsumerRequest.SubscriptionType.Shared;
1718
case FAILOVER -> DanubeApi.ConsumerRequest.SubscriptionType.Failover;
19+
case KEY_SHARED -> DanubeApi.ConsumerRequest.SubscriptionType.KeyShared;
1820
};
1921
}
2022
}

danube-client/src/main/java/com/danubemessaging/client/internal/consumer/TopicConsumer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ private long trySubscribe() {
106106
.setConsumerName(consumerName)
107107
.setSubscription(options.subscription())
108108
.setSubscriptionType(options.subType().toProto())
109+
.addAllKeyFilters(options.keyFilters())
109110
.build();
110111

111112
try {

danube-client/src/main/java/com/danubemessaging/client/internal/producer/TopicProducer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ private long tryCreate() {
138138
}
139139
}
140140

141-
public long send(byte[] payload, Map<String, String> attributes) {
141+
public long send(byte[] payload, Map<String, String> attributes, String routingKey) {
142142
ensureOpen();
143143

144144
if (lifecycleState.get() != LifecycleState.CREATED || producerId == 0) {
@@ -173,6 +173,9 @@ public long send(byte[] payload, Map<String, String> attributes) {
173173
if (cachedSchemaVersion != null) {
174174
messageBuilder.setSchemaVersion(cachedSchemaVersion);
175175
}
176+
if (routingKey != null) {
177+
messageBuilder.setRoutingKey(routingKey);
178+
}
176179

177180
DanubeApi.StreamMessage message = messageBuilder.build();
178181

0 commit comments

Comments
 (0)