Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ The Java client library for interacting with Danube Messaging Broker platform.

### 📥 Consumer Capabilities

- **Flexible Subscriptions** - Three subscription types for different use cases:
- **Flexible Subscriptions** - Four subscription types for different use cases:
- **Exclusive** - Single active consumer, guaranteed ordering
- **Shared** - Load balancing across multiple consumers, parallel processing
- **Failover** - High availability with automatic standby promotion
- **Key-Shared** - Per-key ordering with multi-consumer parallelism; messages with the same routing key always go to the same consumer
- **Key Filtering** - In Key-Shared mode, subscribe to a subset of routing keys with glob patterns
- **Message Acknowledgment** - Reliable message processing with at-least-once delivery
- **Partitioned Consumption** - Automatic handling of messages from all partitions
- **Message Attributes** - Access metadata and custom headers
Expand Down Expand Up @@ -47,14 +49,14 @@ The Java client library for interacting with Danube Messaging Broker platform.
<dependency>
<groupId>com.danube-messaging</groupId>
<artifactId>danube-client</artifactId>
<version>0.2.0</version>
<version>0.5.0</version>
</dependency>
```

### Gradle

```groovy
implementation 'com.danube-messaging:danube-client:0.2.0'
implementation 'com.danube-messaging:danube-client:0.5.0'
```

**Requirements:** Java 21 or later.
Expand Down Expand Up @@ -110,6 +112,14 @@ Producer producer = client.newProducer()
.build();
```

### Key-Shared Routing

Tag messages with a routing key so all messages with the same key go to the same consumer:

```java
producer.sendWithKey(payload, Map.of(), "order-123");
```

### Create Consumer

```java
Expand Down Expand Up @@ -164,6 +174,21 @@ shutdown.await();
client.close();
```

### Key-Shared with Filtering

Subscribe to only specific routing keys in a Key-Shared subscription:

```java
Consumer consumer = client.newConsumer()
.withTopic(topic)
.withConsumerName("payments-worker")
.withSubscription("orders-sub")
.withSubscriptionType(SubType.KEY_SHARED)
.withKeyFilter("payment")
.withKeyFilter("invoice")
.build();
```

### Schema Registry

```java
Expand Down Expand Up @@ -205,7 +230,13 @@ Producer producer = client.newProducer()
producer.create();
```

Browse the [examples directory](https://github.com/danube-messaging/danube-java/tree/main/examples) for complete working code.
Browse the [examples directory](https://github.com/danube-messaging/danube-java/tree/main/examples) for complete working code:

- **[SimpleProducerConsumer](examples/SimpleProducerConsumer.java)** — basic send/receive
- **[ReliableDispatch](examples/ReliableDispatchProducer.java)** — at-least-once delivery with acks
- **[Partitions](examples/PartitionsProducer.java)** — partitioned topic
- **[KeyShared](examples/KeySharedProducer.java)** — Key-Shared routing, filtering, and producer with routing keys
- **[JsonProducer](examples/JsonProducer.java)** / **[SchemaEvolution](examples/SchemaEvolution.java)** — schema registry integration

## Contribution

Expand Down
2 changes: 1 addition & 1 deletion danube-client-proto/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.danube-messaging</groupId>
<artifactId>danube-java</artifactId>
<version>0.4.0</version>
<version>0.5.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
8 changes: 8 additions & 0 deletions danube-client-proto/src/main/proto/DanubeApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,17 @@ message ConsumerRequest {
Exclusive = 0; // Only one consumer can subscribe to the topic at a time.
Shared = 1 ; // Multiple consumers can subscribe to the topic concurrently.
Failover = 2; // Only one consumer (the active consumer) receives messages at any given time.
KeyShared = 3; // Messages with same routing key always go to the same consumer, in order.
}
uint64 request_id = 1;
string topic_name = 2;
string consumer_name = 3;
string subscription = 4;
SubscriptionType subscription_type = 5;
// Glob patterns for key-based filtering (optional, KeyShared only).
// Examples: "user-*", "eu-west-?", "orders-premium-*"
// Empty = accept all keys from hash ring assignment.
repeated string key_filters = 6;
}

// Create Consumer response
Expand Down Expand Up @@ -113,6 +118,9 @@ message StreamMessage {
// NEW: Schema identification for registry-based schemas
optional uint64 schema_id = 8; // Global schema ID from registry
optional uint32 schema_version = 9; // Schema version number
// Routing key for Key-Shared dispatch. Set by producer via send_with_key().
// Carried through to consumers for application-level use.
optional string routing_key = 10;
}

// Unique ID of the message
Expand Down
2 changes: 1 addition & 1 deletion danube-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.danube-messaging</groupId>
<artifactId>danube-java</artifactId>
<version>0.4.0</version>
<version>0.5.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.danubemessaging.client;

import com.danubemessaging.client.errors.DanubeClientException;
import java.util.ArrayList;
import java.util.List;

/**
* Builder for {@link Consumer}.
Expand All @@ -12,6 +14,7 @@ public final class ConsumerBuilder {
private String consumerName;
private String subscription;
private SubType subType = SubType.SHARED;
private final List<String> keyFilters = new ArrayList<>();
private ConsumerEventListener eventListener = ConsumerEventListener.noop();
private int maxRetries;
private long baseBackoffMs;
Expand Down Expand Up @@ -55,7 +58,8 @@ public ConsumerBuilder withSubscription(String subscription) {
/**
* Sets the subscription type. Defaults to {@link SubType#SHARED}.
*
* @param subType {@link SubType#EXCLUSIVE}, {@link SubType#SHARED}, or {@link SubType#FAILOVER}
* @param subType {@link SubType#EXCLUSIVE}, {@link SubType#SHARED},
* {@link SubType#FAILOVER}, or {@link SubType#KEY_SHARED}
*/
public ConsumerBuilder withSubscriptionType(SubType subType) {
if (subType != null) {
Expand All @@ -64,6 +68,35 @@ public ConsumerBuilder withSubscriptionType(SubType subType) {
return this;
}

/**
* Adds a single key filter pattern for {@link SubType#KEY_SHARED} subscriptions.
* Uses glob syntax: {@code "user-*"}, {@code "eu-west-?"}, {@code "*"}.
*
* @param pattern the key filter pattern
*/
public ConsumerBuilder withKeyFilter(String pattern) {
if (pattern != null && !pattern.isBlank()) {
this.keyFilters.add(pattern);
}
return this;
}

/**
* Adds multiple key filter patterns for {@link SubType#KEY_SHARED} subscriptions.
*
* @param patterns the key filter patterns
*/
public ConsumerBuilder withKeyFilters(List<String> patterns) {
if (patterns != null) {
for (String p : patterns) {
if (p != null && !p.isBlank()) {
this.keyFilters.add(p);
}
}
}
return this;
}

/**
* Sets a listener for consumer lifecycle and message events.
*
Expand Down Expand Up @@ -120,7 +153,9 @@ public Consumer build() {
}

ConsumerOptions options = new ConsumerOptions(
topic, consumerName, subscription, subType, eventListener,
topic, consumerName, subscription, subType,
List.copyOf(keyFilters),
eventListener,
maxRetries, baseBackoffMs, maxBackoffMs);
return new Consumer(client, options);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.danubemessaging.client;

import java.util.List;

/**
* Immutable consumer configuration.
*/
Expand All @@ -8,6 +10,7 @@ public record ConsumerOptions(
String consumerName,
String subscription,
SubType subType,
List<String> keyFilters,
ConsumerEventListener eventListener,
int maxRetries,
long baseBackoffMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,56 @@ public CompletableFuture<Long> sendAsync(byte[] payload, Map<String, String> att
* @throws com.danubemessaging.client.errors.DanubeClientException on unrecoverable error
*/
public long send(byte[] payload, Map<String, String> attributes) {
return sendInternal(payload, attributes, null, selectTopicProducer());
}

/**
* Sends a message with a routing key asynchronously for KEY_SHARED subscriptions.
*
* @param payload message body
* @param attributes optional metadata
* @param routingKey the routing key; all messages with the same key go to the same consumer
* @return a future resolving to the broker-assigned message sequence ID
*/
public CompletableFuture<Long> sendWithKeyAsync(byte[] payload, Map<String, String> attributes,
String routingKey) {
byte[] payloadCopy = payload == null ? new byte[0] : payload.clone();
Map<String, String> attr = attributes == null ? Map.of() : Map.copyOf(attributes);
return CompletableFuture.supplyAsync(() -> sendWithKey(payloadCopy, attr, routingKey), client.ioExecutor());
}

/**
* Sends a message with a routing key for KEY_SHARED subscriptions.
*
* <p>For partitioned topics, hashes the routing key to a specific partition ensuring
* all messages with the same key always go to the same partition. For non-partitioned
* topics, simply tags the routing key on the message.
*
* @param payload message body
* @param attributes optional metadata; pass {@code Map.of()} for none
* @param routingKey the routing key; must not be null
* @return the broker-assigned message sequence ID
* @throws com.danubemessaging.client.errors.DanubeClientException on unrecoverable error
*/
public long sendWithKey(byte[] payload, Map<String, String> attributes, String routingKey) {
ensureOpen();
TopicProducer topicProducer = selectTopicProducerForKey(routingKey);
return sendInternal(payload, attributes, routingKey, topicProducer);
}

private long sendInternal(byte[] payload, Map<String, String> attributes,
String routingKey, TopicProducer topicProducer) {
ensureOpen();

if (lifecycleState.get() != LifecycleState.CREATED) {
create();
}

TopicProducer topicProducer = selectTopicProducer();
int attempts = 0;

while (true) {
try {
return topicProducer.send(payload, attributes);
return topicProducer.send(payload, attributes, routingKey);
} catch (RuntimeException error) {
boolean unrecoverable = client.retryManager().isUnrecoverable(error);
if (unrecoverable) {
Expand Down Expand Up @@ -227,6 +265,32 @@ private TopicProducer selectTopicProducer() {
return topicProducers.get(index);
}

private TopicProducer selectTopicProducerForKey(String routingKey) {
if (topicProducers.isEmpty()) {
throw new DanubeClientException("Producer is not initialized");
}

if (topicProducers.size() == 1) {
return topicProducers.get(0);
}

int index = Math.floorMod((int) fnv1aHash(routingKey), topicProducers.size());
return topicProducers.get(index);
}

/**
* FNV-1a 64-bit hash — must match Rust/Go/Python constants.
*/
static long fnv1aHash(String key) {
long hash = 0xcbf29ce484222325L;
byte[] bytes = key.getBytes(java.nio.charset.StandardCharsets.UTF_8);
for (byte b : bytes) {
hash ^= (b & 0xFF);
hash *= 0x100000001b3L;
}
return hash;
}

private boolean hasCustomRetryOptions() {
return options.maxRetries() > 0 || options.baseBackoffMs() > 0 || options.maxBackoffMs() > 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
public enum SubType {
EXCLUSIVE,
SHARED,
FAILOVER;
FAILOVER,
KEY_SHARED;

public DanubeApi.ConsumerRequest.SubscriptionType toProto() {
return switch (this) {
case EXCLUSIVE -> DanubeApi.ConsumerRequest.SubscriptionType.Exclusive;
case SHARED -> DanubeApi.ConsumerRequest.SubscriptionType.Shared;
case FAILOVER -> DanubeApi.ConsumerRequest.SubscriptionType.Failover;
case KEY_SHARED -> DanubeApi.ConsumerRequest.SubscriptionType.KeyShared;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ private long trySubscribe() {
.setConsumerName(consumerName)
.setSubscription(options.subscription())
.setSubscriptionType(options.subType().toProto())
.addAllKeyFilters(options.keyFilters())
.build();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private long tryCreate() {
}
}

public long send(byte[] payload, Map<String, String> attributes) {
public long send(byte[] payload, Map<String, String> attributes, String routingKey) {
ensureOpen();

if (lifecycleState.get() != LifecycleState.CREATED || producerId == 0) {
Expand Down Expand Up @@ -173,6 +173,9 @@ public long send(byte[] payload, Map<String, String> attributes) {
if (cachedSchemaVersion != null) {
messageBuilder.setSchemaVersion(cachedSchemaVersion);
}
if (routingKey != null) {
messageBuilder.setRoutingKey(routingKey);
}

DanubeApi.StreamMessage message = messageBuilder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public record StreamMessage(
String subscriptionName,
Map<String, String> attributes,
Long schemaId,
Integer schemaVersion) {
Integer schemaVersion,
String routingKey) {

public StreamMessage {
payload = payload == null ? new byte[0] : payload.clone();
Expand All @@ -30,6 +31,7 @@ public byte[] payload() {
public static StreamMessage fromProto(DanubeApi.StreamMessage proto) {
Long schemaId = proto.hasSchemaId() ? proto.getSchemaId() : null;
Integer schemaVersion = proto.hasSchemaVersion() ? proto.getSchemaVersion() : null;
String routingKey = proto.hasRoutingKey() ? proto.getRoutingKey() : null;

return new StreamMessage(
proto.getRequestId(),
Expand All @@ -40,6 +42,7 @@ public static StreamMessage fromProto(DanubeApi.StreamMessage proto) {
proto.getSubscriptionName(),
proto.getAttributesMap(),
schemaId,
schemaVersion);
schemaVersion,
routingKey);
}
}
Loading
Loading