Skip to content

Commit a2fd5d8

Browse files
authored
DOC-5694 astra-streaming-examples and pulsar-subscription-example archived (#169)
* pulsar streaming examples * grafana dashboards * working on client examples * java examples
1 parent 9d75186 commit a2fd5d8

24 files changed

Lines changed: 10159 additions & 215 deletions

antora.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ asciidoc:
4747
starlight-kafka: 'Starlight for Kafka'
4848
starlight-rabbitmq: 'Starlight for RabbitMQ'
4949
gpt-schema-translator: 'GPT schema translator'
50-
astra-streaming-examples-repo: 'https://github.com/datastax/astra-streaming-examples'
5150

5251
# Required for include::common partials; some shared with Luna Streaming
5352
web-ui: '{astra-ui}'

modules/ROOT/pages/astream-subscriptions-exclusive.adoc

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,57 @@
11
= Exclusive subscriptions in {pulsar-reg}
22
:navtitle: Exclusive
3+
:subscription-comment: // Subscription type is set to Exclusive
4+
:subscription-type: .subscriptionType(SubscriptionType.Exclusive)
35

46
_Subscriptions_ in {pulsar-reg} describe which consumers are consuming data from a topic and how they want to consume that data.
57

68
An _exclusive subscription_ describes a basic publish-subscribe (pub-sub) pattern where a single consumer subscribes to a single topic and consumes from it.
79

810
This page explains how to use {pulsar-short}'s exclusive subscription model to manage your topic consumption.
911
12+
== Prerequisites
13+
1014
include::ROOT:partial$subscription-prereq.adoc[]
1115
16+
== Prepare example project and producer
17+
18+
include::ROOT:partial$subscription-setup-project.adoc[]
19+
20+
1221
[#example]
13-
== Exclusive subscription example
22+
== Create consumer with exclusive subscription
1423
15-
. To configure a {pulsar-short} exclusive subscription, define a `pulsarConsumer` object in `SimplePulsarConsumer.java`, as you would for other subscription types.
16-
However, you don't need to declare a `subscriptionType`.
17-
Whereas other subscription types required you to declare a specific `subscriptionType`, {pulsar-short} creates an exclusive subscription by default if you don't declare a `subscriptionType`.
24+
To create a {pulsar-short} exclusive subscription, create a `pulsarConsumer` with `{subscription-type}`.
25+
26+
. In `src/main/java/com/datastax/pulsar`, create a `SimplePulsarConsumer.java` file with the following contents:
1827
+
1928
.SimplePulsarConsumer.java
20-
[source,java]
29+
[source,java,subs="+attributes"]
2130
----
22-
pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
23-
.topic("persistent://"
24-
+ conf.getTenantName() + "/"
25-
+ conf.getNamespace() + "/"
26-
+ conf.getTopicName())
27-
.startMessageIdInclusive()
28-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
29-
.subscriptionName("SimplePulsarConsumer")
30-
.subscribe();
31+
include::ROOT:partial$simplepulsarconsumer.java[]
3132
----
3233
+
33-
If you want to explicitly define an exclusive subscription, you can add `.subscriptionType(SubscriptionType.Exclusive)` to the consumer.
34-
35-
. In the `pulsar-subscription-example` project, run `SimplePulsarConsumer.java` to begin consuming messages.
36-
+
37-
The confirmation message and a cursor appear to indicate the consumer is ready:
34+
Alternatively, you can omit the `.subscriptionType` declaration because `Exclusive` is the default subscription type:
3835
+
39-
.Result
40-
[source,console]
36+
.SimplePulsarConsumer.java with implied exclusive subscription
37+
[source,java]
4138
----
42-
[main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully
43-
...
44-
[pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://<tenant_name>/<namespace>/in][SimplePulsarConsumer] Subscribed to topic on <service_url> -- consumer: 0
39+
pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
40+
.topic("persistent://"
41+
+ conf.getTenantName() + "/"
42+
+ conf.getNamespace() + "/"
43+
+ conf.getTopicName())
44+
.startMessageIdInclusive()
45+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
46+
.subscriptionName("SimplePulsarConsumer")
47+
// No subscriptionType declaration, defaults to Exclusive
48+
.subscribe();
4549
----
4650
51+
== Test exclusive subscription
52+
53+
include::ROOT:partial$subscription-start-consumer.adoc[]
54+
4755
. In a new terminal window, run `SimplePulsarProducer.java` to begin producing messages:
4856
+
4957
.Result

modules/ROOT/pages/astream-subscriptions-failover.adoc

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
= Failover subscriptions in {pulsar-reg}
22
:navtitle: Failover
3+
:subscription-comment: // Subscription type is set to Failover
4+
:subscription-type: .subscriptionType(SubscriptionType.Failover)
35

46
_Subscriptions_ in {pulsar-reg} describe which consumers are consuming data from a topic and how they want to consume that data.
57

@@ -10,39 +12,30 @@ If the primary consumer disconnects, the standby consumers begin consuming the s
1012

1113
This page explains how to use {pulsar-short}'s failover subscription model to manage your topic consumption.
1214
15+
== Prerequisites
16+
1317
include::ROOT:partial$subscription-prereq.adoc[]
1418
19+
== Prepare example project and producer
20+
21+
include::ROOT:partial$subscription-setup-project.adoc[]
22+
1523
[#example]
16-
== Failover subscription example
24+
== Create consumer with failover subscription
1725
18-
To try out a {pulsar-short} failover subscription, add `.subscriptionType(SubscriptionType.Failover)` to the `pulsarConsumer` in `SimplePulsarConsumer.java`:
26+
To create a {pulsar-short} failover subscription, create a `pulsarConsumer` with `{subscription-type}`.
1927
28+
. In `src/main/java/com/datastax/pulsar`, create a `SimplePulsarConsumer.java` file with the following contents:
29+
+
2030
.SimplePulsarConsumer.java
21-
[source,java]
31+
[source,java,subs="+attributes"]
2232
----
23-
pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
24-
.topic("persistent://"
25-
+ conf.getTenantName() + "/"
26-
+ conf.getNamespace() + "/"
27-
+ conf.getTopicName())
28-
.startMessageIdInclusive()
29-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
30-
.subscriptionName("SimplePulsarConsumer")
31-
.subscriptionType(SubscriptionType.Failover)
32-
.subscribe();
33+
include::ROOT:partial$simplepulsarconsumer.java[]
3334
----
3435
35-
. In the `pulsar-subscription-example` project, run `SimplePulsarConsumer.java` to begin consuming messages as the primary consumer.
36-
+
37-
The confirmation message and a cursor appear to indicate the consumer is ready:
38-
+
39-
.Result
40-
[source,console]
41-
----
42-
[main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully
43-
...
44-
[pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://<tenant_name>/<namespace>/in][SimplePulsarConsumer] Subscribed to topic on <service_url> -- consumer: 0
45-
----
36+
== Test failover subscription
37+
38+
include::ROOT:partial$subscription-start-consumer.adoc[]
4639
4740
. In a new terminal window, run `SimplePulsarProducer.java` to begin producing messages:
4841
+
@@ -70,8 +63,8 @@ In the `SimplePulsarConsumer` terminal, the primary consumer begins consuming me
7063
. In a new terminal window, run a new instance of `SimplePulsarConsumer.java` as a backup consumer.
7164
The backup consumer subscribes to the topic but does not immediately begin consuming messages.
7265
73-
. In the primary `SimplePulsarConsumer` terminal, stop the process (`Ctrl+C`).
74-
In the second `SimplePulsarConsumer` terminal, the backup consumer begins consuming messages where the first consumer left off:
66+
. In your first `SimplePulsarConsumer` terminal, stop the process (`Ctrl+C`), and then switch to your second `SimplePulsarConsumer` terminal.
67+
Notice that the backup consumer begins consuming messages where the first consumer left off:
7568
+
7669
.Result
7770
[source,console]

modules/ROOT/pages/astream-subscriptions-keyshared.adoc

Lines changed: 105 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
= Key shared subscriptions in {pulsar-reg}
22
:navtitle: Key shared
3+
:subscription-comment: // Subscription type is set to Key_Shared
4+
:subscription-type: .subscriptionType(SubscriptionType.Key_Shared)
35

46
_Subscriptions_ in {pulsar-reg} describe which consumers are consuming data from a topic and how they want to consume that data.
57

@@ -14,16 +16,45 @@ Keys are generated with hashing that converts arbitrary values like `topic-name`
1416
1517
This page explains how to use {pulsar-short}'s key shared subscription model to manage your topic consumption.
1618
19+
== Prerequisites
20+
1721
include::ROOT:partial$subscription-prereq.adoc[]
1822
23+
== Prepare example project and producer
24+
25+
include::ROOT:partial$subscription-setup-project.adoc[]
26+
1927
[#example]
20-
== Key shared subscription example
28+
== Create consumer with key shared subscription
29+
30+
To create a {pulsar-short} key shared subscription, create a `pulsarConsumer` with `{subscription-type}` and a `keySharedPolicy` configuration.
31+
The `keySharedPolicy` defines how hashed values are assigned to subscribed consumers.
32+
33+
For the simplest configuration, use the `<<use-autosplithashrange,autoSplitHashRange>>` policy.
34+
35+
If you need to set fixed hash ranges, use the `<<use-stickyhashrange,stickyHashRange>>` policy.
36+
37+
[#use-autosplithashrange]
38+
=== Use autoSplitHashRange
39+
40+
To automatically assign hash ranges to consumers, use the `autoSplitHashRange` policy.
41+
Running multiple consumers with `autoSplitHashRange` balances the messaging load across all available consumers, like a xref:astream-subscriptions-shared.adoc[shared subscription].
2142
22-
. To try out a {pulsar-short} key shared subscription, add `.subscriptionType(SubscriptionType.Key_Shared)` to the `pulsarConsumer` in `SimplePulsarConsumer.java`:
43+
. In `src/main/java/com/datastax/pulsar`, create a `SimplePulsarConsumer.java` file with the following contents:
2344
+
2445
.SimplePulsarConsumer.java
25-
[source,java]
46+
[source,java,subs="+attributes"]
47+
----
48+
include::ROOT:partial$simplepulsarconsumer.java[]
49+
----
50+
51+
. In `pulsarConsumer`, add `.keySharedPolicy(KeySharedPolicy.autoSplitHashRange())`:
52+
+
53+
.SimplePulsarConsumer.java with autoSplitHashRange
54+
[source,java,subs="+attributes"]
2655
----
56+
...
57+
2758
pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
2859
.topic("persistent://"
2960
+ conf.getTenantName() + "/"
@@ -32,19 +63,30 @@ pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
3263
.startMessageIdInclusive()
3364
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
3465
.subscriptionName("SimplePulsarConsumer")
35-
.subscriptionType(SubscriptionType.Key_Shared)
66+
{subscription-comment}
67+
{subscription-type}
68+
// Define key shared policy for hash range assignment
3669
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
3770
.subscribe();
71+
72+
...
3873
----
74+
75+
[#use-stickyhashrange]
76+
=== Use stickyHashRange
77+
78+
To set a fixed hash range, use the `stickyHashRange` policy.
79+
This policy requires additional dependencies and producer configuration changes, in addition to the consumer configuration.
80+
81+
. In `src/main/java/com/datastax/pulsar`, create a `SimplePulsarConsumer.java` file with the following contents:
3982
+
40-
The `keySharedPolicy` defines how hashed values are assigned to subscribed consumers.
41-
+
42-
The above example uses `autoSplitHashRange`, which is an auto-hashing policy.
43-
Running multiple consumers with auto-hashing balances the messaging load across all available consumers, like a xref:astream-subscriptions-shared.adoc[shared subscription].
44-
+
45-
If you want to set a fixed hash range, use `KeySharedPolicy.stickyHashRange()`, as demonstrated in the following steps.
83+
.SimplePulsarConsumer.java
84+
[source,java,subs="+attributes"]
85+
----
86+
include::ROOT:partial$simplepulsarconsumer.java[]
87+
----
4688
47-
. To use a sticky hashed key shared subscription, import the following classes to `SimplePulsarConsumer.java`:
89+
. Import the following additional classes that are required for the `stickyHashRange` policy:
4890
+
4991
.SimplePulsarConsumer.java
5092
[source,java]
@@ -54,7 +96,45 @@ import org.apache.pulsar.client.api.KeySharedPolicy;
5496
import org.apache.pulsar.client.api.SubscriptionType;
5597
----
5698
57-
. Add the following classes to `SimplePulsarProducer.java`:
99+
. In `pulsarConsumer`, add the sticky hash key policy: `.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(int,int)))`.
100+
+
101+
The following example sets all possible hashes (`0-65535`) on this subscription to one consumer:
102+
+
103+
.SimplePulsarConsumer.java with stickyHashRange for one consumer
104+
[source,java]
105+
----
106+
...
107+
108+
pulsarConsumer = pulsarClient.newConsumer(Schema.JSON(DemoBean.class))
109+
.topic("persistent://"
110+
+ conf.getTenantName() + "/"
111+
+ conf.getNamespace() + "/"
112+
+ conf.getTopicName())
113+
.startMessageIdInclusive()
114+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
115+
.subscriptionName("SimplePulsarConsumer")
116+
{subscription-comment}
117+
{subscription-type}
118+
// Policy assigns all possible hashes to one consumer
119+
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0,65535)))
120+
.subscribe();
121+
122+
...
123+
----
124+
+
125+
To split the hash range between multiple consumers, add a `Range.of()` argument for each consumer with the assigned hash range.
126+
For example:
127+
+
128+
.SimplePulsarConsumer.java with stickyHashRange for two consumers
129+
[source,java]
130+
----
131+
// Policy assigns half of the hash range to one consumer and half to another
132+
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0,32767), Range.of(32768,65535)))
133+
----
134+
135+
. Modify the producer configuration to support `stickyHashRange`:
136+
+
137+
.. In `SimplePulsarProducer.java`, import the following classes:
58138
+
59139
.SimplePulsarProducer.java
60140
[source,java]
@@ -63,7 +143,7 @@ import org.apache.pulsar.client.api.BatcherBuilder;
63143
import org.apache.pulsar.client.api.HashingScheme;
64144
----
65145
66-
. In `SimplePulsarProducer.java`, modify the `pulsarProducer` to use the `JavaStringHash` hashing scheme:
146+
.. Configure the `pulsarProducer` to use the `JavaStringHash` hashing scheme:
67147
+
68148
.SimplePulsarProducer.java
69149
[source,java]
@@ -74,32 +154,16 @@ pulsarProducer = pulsarClient
74154
+ conf.getTenantName() + "/"
75155
+ conf.getNamespace() + "/"
76156
+ conf.getTopicName())
157+
// Send messages with the same key to the same consumer
77158
.batcherBuilder(BatcherBuilder.KEY_BASED)
159+
// Generate hash values for messages based on their keys
78160
.hashingScheme(HashingScheme.JavaStringHash)
79161
.create();
80162
----
81163
82-
. In `SimplePulsarConsumer.java`, modify the `pulsarConsumer` to use sticky hashing.
83-
This example sets all possible hashes (`0-65535`) on this subscription to one consumer.
84-
+
85-
.SimplePulsarConsumer.java
86-
[source,java]
87-
----
88-
.subscriptionType(SubscriptionType.Key_Shared)
89-
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0,65535)))
90-
----
164+
== Test key shared subscription
91165
92-
. In the `pulsar-examples` project, run `SimplePulsarConsumer.java` to begin consuming messages.
93-
+
94-
The confirmation message and a cursor appear to indicate the consumer is ready:
95-
+
96-
.Result
97-
[source,console]
98-
----
99-
[main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully
100-
...
101-
[pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://<tenant_name>/<namespace>/in][SimplePulsarConsumer] Subscribed to topic on <service_url> -- consumer: 0
102-
----
166+
include::ROOT:partial$subscription-start-consumer.adoc[]
103167
104168
. In a new terminal window, run `SimplePulsarProducer.java` to begin producing messages:
105169
+
@@ -123,9 +187,13 @@ In the `SimplePulsarConsumer` terminal, the consumer begins receiving messages:
123187
124188
. In a new terminal window, try to run a new instance of `SimplePulsarConsumer.java`.
125189
+
126-
The new consumer cannot subscribe to the topic because the `SimplePulsarConsumer` configuration reserved the entire hash range for the first consumer:
190+
If you used `autoSplitHashRange`, then the new consumer subscribes to the topic and consumes messages.
191+
The auto-hashing policy balances hash ranges across available consumers.
127192
+
128-
.Result
193+
If you used sticky hashing with one `Range.of()` argument, then the new consumer cannot subscribe to the topic because the `SimplePulsarConsumer` configuration reserved the entire hash range for the first consumer.
194+
For example:
195+
+
196+
.Result when using sticky hashing limited to one consumer
129197
[source,console]
130198
----
131199
[main] INFO com.datastax.pulsar.Configuration - Configuration has been loaded successfully
@@ -137,8 +205,8 @@ Caused by: org.apache.pulsar.client.api.PulsarClientException$ConsumerAssignExce
137205
at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:101)
138206
at com.datastax.pulsar.SimplePulsarConsumer.main(SimplePulsarConsumer.java:47)
139207
----
140-
141-
. To run multiple consumers with sticky hashing, modify the `SimplePulsarConsumer.java` configuration to split the hash range between consumers or use auto-hashing.
208+
+
209+
To run multiple consumers with sticky hashing, you must modify `SimplePulsarConsumer.java` to split the hash range between consumers, as explained in <<use-stickyhashrange>>.
142210
Then, you can launch multiple instances of `SimplePulsarConsumer.java` to consume messages from different hash ranges.
143211
144212
== See also

0 commit comments

Comments
 (0)