Skip to content

Commit 17552b1

Browse files
committed
create the examples that include reliable and partitioned topics
1 parent 5b26688 commit 17552b1

9 files changed

Lines changed: 718 additions & 9 deletions

examples/JsonConsumer.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import com.danubemessaging.client.Consumer;
2+
import com.danubemessaging.client.DanubeClient;
3+
import com.danubemessaging.client.SubType;
4+
import com.danubemessaging.client.model.StreamMessage;
5+
6+
import java.util.concurrent.CountDownLatch;
7+
import java.util.concurrent.Flow;
8+
9+
/**
10+
* JSON consumer example: subscribes to a topic and prints received JSON payloads.
11+
*
12+
* Start this consumer BEFORE running JsonProducer.java.
13+
*
14+
* Prerequisites: Danube broker running on localhost:6650
15+
* cd docker && docker compose up -d
16+
*/
17+
public class JsonConsumer {
18+
19+
private static final String BROKER_URL = System.getenv().getOrDefault("DANUBE_BROKER_URL", "http://127.0.0.1:6650");
20+
private static final String TOPIC = "/default/json_topic";
21+
22+
public static void main(String[] args) throws Exception {
23+
DanubeClient client = DanubeClient.builder()
24+
.serviceUrl(BROKER_URL)
25+
.build();
26+
27+
Consumer consumer = client.newConsumer()
28+
.withTopic(TOPIC)
29+
.withConsumerName("cons_json")
30+
.withSubscription("subs_json")
31+
.withSubscriptionType(SubType.EXCLUSIVE)
32+
.build();
33+
34+
consumer.subscribe();
35+
System.out.println("Consumer subscribed — waiting for messages...");
36+
37+
// Infinite receive loop — press Ctrl+C to stop
38+
CountDownLatch shutdown = new CountDownLatch(1);
39+
40+
Runtime.getRuntime().addShutdownHook(Thread.ofVirtual().unstarted(() -> {
41+
consumer.close();
42+
client.close();
43+
shutdown.countDown();
44+
}));
45+
46+
consumer.receive().subscribe(new Flow.Subscriber<>() {
47+
@Override
48+
public void onSubscribe(Flow.Subscription subscription) {
49+
subscription.request(Long.MAX_VALUE);
50+
}
51+
52+
@Override
53+
public void onNext(StreamMessage msg) {
54+
String json = new String(msg.payload());
55+
System.out.printf("Received (schema_version=%d): %s%n",
56+
msg.schemaVersion() != null ? msg.schemaVersion() : 0, json);
57+
consumer.ack(msg);
58+
}
59+
60+
@Override
61+
public void onError(Throwable t) {
62+
System.err.println("Receive error: " + t.getMessage());
63+
shutdown.countDown();
64+
}
65+
66+
@Override
67+
public void onComplete() {
68+
shutdown.countDown();
69+
}
70+
});
71+
72+
shutdown.await();
73+
}
74+
}

examples/JsonProducer.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import com.danubemessaging.client.DanubeClient;
2+
import com.danubemessaging.client.Producer;
3+
import com.danubemessaging.client.SchemaRegistryClient;
4+
import com.danubemessaging.client.schema.SchemaType;
5+
6+
import java.util.Map;
7+
8+
/**
9+
* JSON producer example: registers a JSON schema in the schema registry and
10+
* produces schema-tagged messages to a topic.
11+
*
12+
* Run the JSON consumer (JsonConsumer.java) in a separate terminal first.
13+
*
14+
* Prerequisites: Danube broker running on localhost:6650
15+
* cd docker && docker compose up -d
16+
*/
17+
public class JsonProducer {
18+
19+
private static final String BROKER_URL = System.getenv().getOrDefault("DANUBE_BROKER_URL", "http://127.0.0.1:6650");
20+
private static final String TOPIC = "/default/json_topic";
21+
private static final String SUBJECT = "my-app-events";
22+
23+
private static final String JSON_SCHEMA = """
24+
{
25+
"type": "object",
26+
"properties": {
27+
"field1": {"type": "string"},
28+
"field2": {"type": "integer"}
29+
}
30+
}""";
31+
32+
public static void main(String[] args) throws Exception {
33+
DanubeClient client = DanubeClient.builder()
34+
.serviceUrl(BROKER_URL)
35+
.build();
36+
37+
// Register schema in schema registry
38+
SchemaRegistryClient schemaClient = client.newSchemaRegistry();
39+
40+
var registration = schemaClient.registerSchema(
41+
schemaClient.newRegistration()
42+
.withSubject(SUBJECT)
43+
.withSchemaType(SchemaType.JSON_SCHEMA)
44+
.withSchemaDefinition(JSON_SCHEMA.getBytes()));
45+
46+
System.out.printf("Registered schema '%s' — id=%d version=%d%n",
47+
SUBJECT, registration.schemaId(), registration.version());
48+
49+
// Create producer pinned to the latest schema for this subject
50+
Producer producer = client.newProducer()
51+
.withTopic(TOPIC)
52+
.withName("prod_json")
53+
.withSchemaLatest(SUBJECT)
54+
.build();
55+
56+
producer.create();
57+
System.out.println("Producer created");
58+
59+
for (int i = 0; i < 10; i++) {
60+
String json = String.format("{\"field1\": \"value%d\", \"field2\": %d}", i, 2020 + i);
61+
long msgId = producer.send(json.getBytes(), Map.of());
62+
System.out.printf("Sent message #%d (id=%d): %s%n", i, msgId, json);
63+
Thread.sleep(1000);
64+
}
65+
66+
client.close();
67+
}
68+
}

examples/PartitionsConsumer.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import com.danubemessaging.client.Consumer;
2+
import com.danubemessaging.client.DanubeClient;
3+
import com.danubemessaging.client.SubType;
4+
import com.danubemessaging.client.model.StreamMessage;
5+
6+
import java.util.concurrent.CountDownLatch;
7+
import java.util.concurrent.Flow;
8+
9+
/**
10+
* Partitioned consumer example: subscribes to a partitioned topic and prints
11+
* each message along with the partition it arrived from.
12+
*
13+
* Start this consumer BEFORE running PartitionsProducer.java.
14+
*
15+
* Prerequisites: Danube broker running on localhost:6650
16+
* cd docker && docker compose up -d
17+
*/
18+
public class PartitionsConsumer {
19+
20+
private static final String BROKER_URL = System.getenv().getOrDefault("DANUBE_BROKER_URL", "http://127.0.0.1:6650");
21+
private static final String TOPIC = "/default/partitioned_topic";
22+
23+
public static void main(String[] args) throws Exception {
24+
DanubeClient client = DanubeClient.builder()
25+
.serviceUrl(BROKER_URL)
26+
.build();
27+
28+
Consumer consumer = client.newConsumer()
29+
.withTopic(TOPIC)
30+
.withConsumerName("cons_part")
31+
.withSubscription("subs_part")
32+
.withSubscriptionType(SubType.EXCLUSIVE)
33+
.build();
34+
35+
consumer.subscribe();
36+
System.out.println("Consumer subscribed — waiting for messages across all partitions...");
37+
38+
CountDownLatch shutdown = new CountDownLatch(1);
39+
40+
Runtime.getRuntime().addShutdownHook(Thread.ofVirtual().unstarted(() -> {
41+
consumer.close();
42+
client.close();
43+
shutdown.countDown();
44+
}));
45+
46+
consumer.receive().subscribe(new Flow.Subscriber<>() {
47+
@Override
48+
public void onSubscribe(Flow.Subscription subscription) {
49+
subscription.request(Long.MAX_VALUE);
50+
}
51+
52+
@Override
53+
public void onNext(StreamMessage msg) {
54+
String payload = new String(msg.payload());
55+
System.out.printf("Received from partition '%s': %s%n",
56+
msg.messageId().topicName(), payload);
57+
consumer.ack(msg);
58+
}
59+
60+
@Override
61+
public void onError(Throwable t) {
62+
System.err.println("Receive error: " + t.getMessage());
63+
shutdown.countDown();
64+
}
65+
66+
@Override
67+
public void onComplete() {
68+
shutdown.countDown();
69+
}
70+
});
71+
72+
shutdown.await();
73+
}
74+
}

examples/PartitionsProducer.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import com.danubemessaging.client.DanubeClient;
2+
import com.danubemessaging.client.Producer;
3+
4+
import java.util.Map;
5+
6+
/**
7+
* Partitioned producer example: creates a producer with 3 partitions and
8+
* round-robins messages across them.
9+
*
10+
* Run PartitionsConsumer.java in a separate terminal to receive these messages.
11+
*
12+
* Prerequisites: Danube broker running on localhost:6650
13+
* cd docker && docker compose up -d
14+
*/
15+
public class PartitionsProducer {
16+
17+
private static final String BROKER_URL = System.getenv().getOrDefault("DANUBE_BROKER_URL", "http://127.0.0.1:6650");
18+
private static final String TOPIC = "/default/partitioned_topic";
19+
20+
public static void main(String[] args) throws Exception {
21+
DanubeClient client = DanubeClient.builder()
22+
.serviceUrl(BROKER_URL)
23+
.build();
24+
25+
Producer producer = client.newProducer()
26+
.withTopic(TOPIC)
27+
.withName("prod_part")
28+
.withPartitions(3)
29+
.build();
30+
31+
producer.create();
32+
System.out.println("Partitioned producer created (3 partitions)");
33+
34+
for (int i = 0; i < 20; i++) {
35+
String payload = "Hello Danube " + i;
36+
long msgId = producer.send(payload.getBytes(), Map.of());
37+
System.out.printf("Sent message #%d (id=%d): %s%n", i, msgId, payload);
38+
Thread.sleep(500);
39+
}
40+
41+
System.out.println("Done");
42+
client.close();
43+
}
44+
}

examples/README.md

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,73 @@
1-
# Danube Java examples
1+
# Danube Java Examples
22

3-
These standalone examples demonstrate common Danube Java client workflows.
3+
Standalone Java examples demonstrating common Danube messaging client workflows.
4+
Each file is a self-contained program with a `main` method — no build system required.
45

5-
## Files
6+
## Examples
67

7-
- `SchemaRegistryProducerExample.java`: register schema + build producer with schema reference
8-
- `ConsumerObservabilityExample.java`: subscribe consumer with lifecycle/message/error listeners
8+
| File | Description |
9+
|------|-------------|
10+
| `SimpleProducerConsumer.java` | Basic producer + consumer, raw byte messages |
11+
| `JsonProducer.java` | Register a JSON schema and produce schema-tagged messages |
12+
| `JsonConsumer.java` | Consume and print JSON messages |
13+
| `PartitionsProducer.java` | Produce across 3 topic partitions |
14+
| `PartitionsConsumer.java` | Consume from all partitions, showing which partition each message arrived from |
15+
| `ReliableDispatchProducer.java` | Reliable dispatch: broker waits for ack before sending next message |
16+
| `ReliableDispatchConsumer.java` | Consume reliable messages and report byte throughput |
17+
| `SchemaEvolution.java` | Register, evolve, and compatibility-check schemas in the registry |
918

10-
## Run locally
19+
## Prerequisites
1120

12-
1. Start a Danube broker locally.
13-
2. Build artifacts:
21+
Start the Danube broker using Docker Compose:
1422

1523
```bash
24+
cd docker
25+
docker compose up -d
26+
```
27+
28+
Build the client jars:
29+
30+
```bash
31+
# From the repo root
1632
mvn -DskipTests package
1733
```
1834

19-
3. Compile and run an example with your preferred IDE or local `javac/java` setup, ensuring `danube-client` and `danube-client-proto` are on the classpath.
35+
## Running an Example
36+
37+
Set up the classpath with both the client and its proto dependency:
38+
39+
```bash
40+
CP=danube-client/target/danube-client-0.2.0.jar:danube-client-proto/target/danube-client-proto-0.2.0.jar
41+
42+
# Compile
43+
javac -cp "$CP" examples/SimpleProducerConsumer.java -d examples/out
44+
45+
# Run
46+
java -cp "$CP:examples/out" SimpleProducerConsumer
47+
```
48+
49+
Override the broker URL via environment variable if needed:
50+
51+
```bash
52+
DANUBE_BROKER_URL=http://my-broker:6650 java -cp "$CP:examples/out" SimpleProducerConsumer
53+
```
54+
55+
## Producer / Consumer Pairs
56+
57+
These examples are designed to be run together in separate terminals:
58+
59+
| Producer | Consumer | Topic |
60+
|----------|----------|-------|
61+
| `JsonProducer` | `JsonConsumer` | `/default/json_topic` |
62+
| `PartitionsProducer` | `PartitionsConsumer` | `/default/partitioned_topic` |
63+
| `ReliableDispatchProducer` | `ReliableDispatchConsumer` | `/default/reliable_topic` |
64+
65+
**Always start the consumer first** so it is subscribed before the producer sends.
66+
67+
## Key Concepts
68+
69+
- **`producer.create()`** — registers the producer with the broker (creates topic if needed)
70+
- **`consumer.subscribe()`** — subscribes to the topic, starts receive loops on virtual threads
71+
- **`consumer.receive()`** — returns a `Flow.Publisher<StreamMessage>`; attach a `Flow.Subscriber` to receive messages
72+
- **`consumer.ack(msg)`** — acknowledges a message; required in reliable dispatch mode
73+
- **Schema registry** — register schemas via `client.newSchemaRegistry()`, then use `.withSchemaLatest(subject)` on the producer builder

0 commit comments

Comments
 (0)