|
1 | | -# danube-java |
| 1 | +# Danube-java client |
2 | 2 |
|
3 | | -Java client library for Danube Messaging platform. |
| 3 | +The Java client library for interacting with Danube Messaging Broker platform. |
4 | 4 |
|
5 | | -## Repository layout |
| 5 | +[Danube](https://github.com/danube-messaging/danube) is an open-source **distributed** Messaging platform written in Rust. Consult [the documentation](https://danube-docs.dev-state.com/) for supported concepts and the platform architecture. |
6 | 6 |
|
7 | | -- `danube-client-proto`: generated protobuf + gRPC Java stubs |
8 | | -- `danube-client`: handwritten Java client API and internals |
| 7 | +## Features |
9 | 8 |
|
10 | | -## Where to copy proto files |
| 9 | +### 📤 Producer Capabilities |
11 | 10 |
|
12 | | -Copy all `.proto` files here (preserve import paths): |
| 11 | +- **Basic Messaging** - Send messages with byte payloads and optional key-value attributes |
| 12 | +- **Partitioned Topics** - Distribute messages across multiple partitions for horizontal scaling |
| 13 | +- **Reliable Dispatch** - Guaranteed message delivery with persistence (WAL + cloud storage) |
| 14 | +- **Schema Integration** - Type-safe messaging with automatic validation (Bytes, String, Number, Avro, JSON Schema, Protobuf) |
13 | 15 |
|
14 | | -`danube-client-proto/src/main/proto/` |
| 16 | +### 📥 Consumer Capabilities |
15 | 17 |
|
16 | | -Example: |
| 18 | +- **Flexible Subscriptions** - Three subscription types for different use cases: |
| 19 | + - **Exclusive** - Single active consumer, guaranteed ordering |
| 20 | + - **Shared** - Load balancing across multiple consumers, parallel processing |
| 21 | + - **Failover** - High availability with automatic standby promotion |
| 22 | +- **Message Acknowledgment** - Reliable message processing with at-least-once delivery |
| 23 | +- **Partitioned Consumption** - Automatic handling of messages from all partitions |
| 24 | +- **Message Attributes** - Access metadata and custom headers |
17 | 25 |
|
18 | | -```text |
19 | | -danube-client-proto/src/main/proto/ |
20 | | - DanubeApi.proto |
21 | | - SchemaRegistry.proto |
22 | | -``` |
| 26 | +### 🔐 Schema Registry |
23 | 27 |
|
24 | | -## How to generate Java stubs |
| 28 | +- **Schema Management** - Register, version, and retrieve schemas (JSON Schema, Avro, Protobuf) |
| 29 | +- **Compatibility Checking** - Validate schema evolution (Backward, Forward, Full, None modes) |
| 30 | +- **Type Safety** - Automatic validation against registered schemas |
| 31 | +- **Schema Evolution** - Safe schema updates with compatibility enforcement |
25 | 32 |
|
26 | | -No manual `protoc` install is required. Maven downloads the compiler/plugin. |
| 33 | +### 🏗️ Client Features |
27 | 34 |
|
28 | | -From repo root: |
| 35 | +- **Virtual Threads** - Built on Project Loom for efficient I/O without blocking platform threads |
| 36 | +- **Reactive API** - `Flow.Publisher<StreamMessage>` receive API (Java standard) |
| 37 | +- **Connection Pooling** - Shared gRPC channel management across producers/consumers |
| 38 | +- **TLS / mTLS** - Secure connections with custom CA and client certificates |
| 39 | +- **JWT Authentication** - API-key based token exchange with automatic renewal |
| 40 | +- **Topic Namespaces** - Organize topics with namespace structure (`/namespace/topic-name`) |
29 | 41 |
|
30 | | -```bash |
31 | | -mvn -pl danube-client-proto -am generate-sources |
| 42 | +## Installation |
| 43 | + |
| 44 | +### Maven |
| 45 | + |
| 46 | +```xml |
| 47 | +<dependency> |
| 48 | + <groupId>com.danubemessaging</groupId> |
| 49 | + <artifactId>danube-client</artifactId> |
| 50 | + <version>0.2.0</version> |
| 51 | +</dependency> |
32 | 52 | ``` |
33 | 53 |
|
34 | | -or via helper script: |
| 54 | +### Gradle |
35 | 55 |
|
36 | | -```bash |
37 | | -bash scripts/generate-stubs.sh |
| 56 | +```groovy |
| 57 | +implementation 'com.danubemessaging:danube-client:0.2.0' |
38 | 58 | ``` |
39 | 59 |
|
40 | | -Generated sources will be in: |
| 60 | +**Requirements:** Java 21 or later. |
41 | 61 |
|
42 | | -- `danube-client-proto/target/generated-sources/protobuf/java` |
43 | | -- `danube-client-proto/target/generated-sources/protobuf/grpc-java` |
| 62 | +## Example Usage |
44 | 63 |
|
45 | | -## Build all modules |
| 64 | +Check out the [example files](https://github.com/danube-messaging/danube-java/tree/main/examples). |
46 | 65 |
|
47 | | -```bash |
48 | | -mvn clean verify |
49 | | -``` |
| 66 | +### Start the Danube server |
| 67 | + |
| 68 | +Use the [instructions from the documentation](https://danube-docs.dev-state.com/) to run the Danube broker/cluster. |
50 | 69 |
|
51 | | -## Schema registry usage (Phase 5) |
| 70 | +### Create Producer |
52 | 71 |
|
53 | 72 | ```java |
| 73 | +import com.danubemessaging.client.DanubeClient; |
| 74 | +import com.danubemessaging.client.Producer; |
| 75 | +import java.util.Map; |
| 76 | + |
54 | 77 | DanubeClient client = DanubeClient.builder() |
55 | | - .serviceUrl("http://localhost:6650") |
| 78 | + .serviceUrl("http://127.0.0.1:6650") |
56 | 79 | .build(); |
57 | 80 |
|
58 | | -SchemaRegistryClient registry = client.newSchemaRegistry(); |
59 | | - |
60 | | -SchemaRegistration registration = registry.registerSchema( |
61 | | - registry.newRegistration() |
62 | | - .withSubject("/default/user-events") |
63 | | - .withSchemaType(SchemaType.JSON_SCHEMA) |
64 | | - .withSchemaDefinition(schemaBytes) |
65 | | - .withDescription("User event payload") |
66 | | - .withCreatedBy("java-client")); |
| 81 | +String topic = "/default/test_topic"; |
| 82 | +String producerName = "test_producer"; |
67 | 83 |
|
68 | 84 | Producer producer = client.newProducer() |
69 | | - .withTopic("/default/user-events") |
70 | | - .withName("orders-producer") |
71 | | - .withSchemaLatest("/default/user-events") |
| 85 | + .withTopic(topic) |
| 86 | + .withName(producerName) |
72 | 87 | .build(); |
| 88 | + |
| 89 | +producer.create(); |
| 90 | +System.out.printf("The Producer %s was created%n", producerName); |
| 91 | + |
| 92 | +byte[] payload = "Hello Danube".getBytes(); |
| 93 | +long messageId = producer.send(payload, Map.of()); |
| 94 | +System.out.printf("The Message with id %d was sent%n", messageId); |
| 95 | + |
| 96 | +client.close(); |
73 | 97 | ``` |
74 | 98 |
|
75 | | -## Observability hooks (Phase 4) |
| 99 | +### Reliable Dispatch (optional) |
| 100 | + |
| 101 | +Reliable dispatch can be enabled when creating the producer; the broker will stream messages to consumers from WAL and cloud storage. |
76 | 102 |
|
77 | 103 | ```java |
| 104 | +import com.danubemessaging.client.DispatchStrategy; |
| 105 | + |
78 | 106 | Producer producer = client.newProducer() |
79 | | - .withTopic("/default/user-events") |
80 | | - .withName("orders-producer") |
81 | | - .withEventListener(new ProducerEventListener() { |
82 | | - @Override |
83 | | - public void onProducerCreated(String topic, String producerName, long producerId) { |
84 | | - System.out.printf("Producer ready topic=%s id=%d%n", topic, producerId); |
85 | | - } |
86 | | - |
87 | | - @Override |
88 | | - public void onSendError(String topic, String producerName, byte[] payload, Throwable error) { |
89 | | - System.err.printf("Send failed topic=%s cause=%s%n", topic, error.getMessage()); |
90 | | - } |
91 | | - }) |
| 107 | + .withTopic(topic) |
| 108 | + .withName(producerName) |
| 109 | + .withDispatchStrategy(DispatchStrategy.RELIABLE) |
92 | 110 | .build(); |
93 | 111 | ``` |
94 | 112 |
|
95 | | -## Consumer schema metadata |
| 113 | +### Create Consumer |
96 | 114 |
|
97 | | -`StreamMessage` now includes optional schema registry metadata when available: |
| 115 | +```java |
| 116 | +import com.danubemessaging.client.Consumer; |
| 117 | +import com.danubemessaging.client.DanubeClient; |
| 118 | +import com.danubemessaging.client.SubType; |
| 119 | +import com.danubemessaging.client.model.StreamMessage; |
| 120 | +import java.util.concurrent.CountDownLatch; |
| 121 | +import java.util.concurrent.Flow; |
98 | 122 |
|
99 | | -- `schemaId()` |
100 | | -- `schemaVersion()` |
| 123 | +DanubeClient client = DanubeClient.builder() |
| 124 | + .serviceUrl("http://127.0.0.1:6650") |
| 125 | + .build(); |
101 | 126 |
|
102 | | -```java |
103 | | -consumer.receiveAsync().thenAccept(msg -> { |
104 | | - if (msg.schemaId() != null) { |
105 | | - System.out.printf("Schema id=%d version=%s%n", msg.schemaId(), msg.schemaVersion()); |
| 127 | +String topic = "/default/test_topic"; |
| 128 | +String consumerName = "test_consumer"; |
| 129 | +String subscriptionName = "test_subscription"; |
| 130 | + |
| 131 | +Consumer consumer = client.newConsumer() |
| 132 | + .withTopic(topic) |
| 133 | + .withConsumerName(consumerName) |
| 134 | + .withSubscription(subscriptionName) |
| 135 | + .withSubscriptionType(SubType.EXCLUSIVE) |
| 136 | + .build(); |
| 137 | + |
| 138 | +// Subscribe to the topic |
| 139 | +consumer.subscribe(); |
| 140 | +System.out.printf("The Consumer %s was created%n", consumerName); |
| 141 | + |
| 142 | +CountDownLatch shutdown = new CountDownLatch(1); |
| 143 | + |
| 144 | +// Start receiving messages via Flow.Publisher |
| 145 | +consumer.receive().subscribe(new Flow.Subscriber<>() { |
| 146 | + @Override |
| 147 | + public void onSubscribe(Flow.Subscription s) { |
| 148 | + s.request(Long.MAX_VALUE); |
106 | 149 | } |
| 150 | + |
| 151 | + @Override |
| 152 | + public void onNext(StreamMessage msg) { |
| 153 | + System.out.printf("Received message: %s%n", new String(msg.payload())); |
| 154 | + |
| 155 | + // Acknowledge the message |
| 156 | + consumer.ack(msg); |
| 157 | + } |
| 158 | + |
| 159 | + @Override public void onError(Throwable t) { shutdown.countDown(); } |
| 160 | + @Override public void onComplete() { shutdown.countDown(); } |
107 | 161 | }); |
| 162 | + |
| 163 | +shutdown.await(); |
| 164 | +client.close(); |
| 165 | +``` |
| 166 | + |
| 167 | +### Schema Registry |
| 168 | + |
| 169 | +```java |
| 170 | +import com.danubemessaging.client.DanubeClient; |
| 171 | +import com.danubemessaging.client.Producer; |
| 172 | +import com.danubemessaging.client.SchemaRegistryClient; |
| 173 | +import com.danubemessaging.client.schema.SchemaType; |
| 174 | +import java.util.Map; |
| 175 | + |
| 176 | +DanubeClient client = DanubeClient.builder() |
| 177 | + .serviceUrl("http://127.0.0.1:6650") |
| 178 | + .build(); |
| 179 | + |
| 180 | +SchemaRegistryClient schemaClient = client.newSchemaRegistry(); |
| 181 | + |
| 182 | +String jsonSchema = """ |
| 183 | + { |
| 184 | + "type": "object", |
| 185 | + "properties": { |
| 186 | + "field1": {"type": "string"}, |
| 187 | + "field2": {"type": "integer"} |
| 188 | + } |
| 189 | + }"""; |
| 190 | + |
| 191 | +// Register a JSON schema |
| 192 | +schemaClient.registerSchema( |
| 193 | + schemaClient.newRegistration() |
| 194 | + .withSubject("my-app-events") |
| 195 | + .withSchemaType(SchemaType.JSON_SCHEMA) |
| 196 | + .withSchemaDefinition(jsonSchema.getBytes())); |
| 197 | + |
| 198 | +// Create producer with schema reference |
| 199 | +Producer producer = client.newProducer() |
| 200 | + .withTopic("/default/test_topic") |
| 201 | + .withName("schema_producer") |
| 202 | + .withSchemaLatest("my-app-events") |
| 203 | + .build(); |
| 204 | + |
| 205 | +producer.create(); |
108 | 206 | ``` |
109 | 207 |
|
110 | | -## Examples |
| 208 | +Browse the [examples directory](https://github.com/danube-messaging/danube-java/tree/main/examples) for complete working code. |
| 209 | + |
| 210 | +## Contribution |
| 211 | + |
| 212 | +Working on improving and adding new features. Please feel free to contribute or report any issues you encounter. |
111 | 213 |
|
112 | | -Standalone runnable examples are available in: |
| 214 | +### Running Integration Tests |
| 215 | + |
| 216 | +Before submitting a PR, start the test cluster and run the integration tests: |
| 217 | + |
| 218 | +```bash |
| 219 | +# 1. Start the cluster |
| 220 | +cd docker/ |
| 221 | +docker compose up -d |
113 | 222 |
|
114 | | -- [`examples/`](./examples) |
| 223 | +# 2. Wait for the broker to be healthy |
| 224 | +docker compose ps |
115 | 225 |
|
116 | | -## Release preparation |
| 226 | +# 3. Run the integration tests from the repository root |
| 227 | +cd .. |
| 228 | +mvn -pl danube-client -Pfailsafe verify |
| 229 | + |
| 230 | +# 4. Stop the cluster when done |
| 231 | +cd docker/ |
| 232 | +docker compose down -v |
| 233 | +``` |
| 234 | + |
| 235 | +### Repository layout |
| 236 | + |
| 237 | +- `danube-client-proto` — generated protobuf + gRPC Java stubs |
| 238 | +- `danube-client` — handwritten Java client API and internals |
| 239 | + |
| 240 | +### Regenerating gRPC stubs |
| 241 | + |
| 242 | +Make sure the proto files are the latest from the [Danube project](https://github.com/danube-messaging/danube/tree/main/danube-core/proto). |
| 243 | + |
| 244 | +Copy all `.proto` files into: |
| 245 | + |
| 246 | +``` |
| 247 | +danube-client-proto/src/main/proto/ |
| 248 | + DanubeApi.proto |
| 249 | + SchemaRegistry.proto |
| 250 | +``` |
| 251 | + |
| 252 | +No manual `protoc` install is required — Maven downloads the compiler and plugin automatically. Regenerate from the repo root: |
| 253 | + |
| 254 | +```bash |
| 255 | +mvn -pl danube-client-proto -am generate-sources |
| 256 | +``` |
| 257 | + |
| 258 | +Or via the helper script: |
| 259 | + |
| 260 | +```bash |
| 261 | +bash scripts/generate-stubs.sh |
| 262 | +``` |
| 263 | + |
| 264 | +Generated sources will be in: |
| 265 | + |
| 266 | +- `danube-client-proto/target/generated-sources/protobuf/java` |
| 267 | +- `danube-client-proto/target/generated-sources/protobuf/grpc-java` |
| 268 | + |
| 269 | +### Build all modules |
| 270 | + |
| 271 | +```bash |
| 272 | +mvn clean verify |
| 273 | +``` |
117 | 274 |
|
118 | | -For Maven Central publishing and signing workflow, see: |
| 275 | +### Release |
119 | 276 |
|
120 | | -- [`RELEASE.md`](./RELEASE.md) |
| 277 | +For Maven Central publishing and signing workflow, see [`RELEASE.md`](./RELEASE.md). |
0 commit comments