-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathConsumerBuilder.java
More file actions
162 lines (144 loc) · 4.82 KB
/
Copy pathConsumerBuilder.java
File metadata and controls
162 lines (144 loc) · 4.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package com.danubemessaging.client;
import com.danubemessaging.client.errors.DanubeClientException;
import java.util.ArrayList;
import java.util.List;
/**
* Builder for {@link Consumer}.
*/
public final class ConsumerBuilder {
private final DanubeClient client;
private String topic;
private String consumerName;
private String subscription;
private SubType subType = SubType.SHARED;
private final List<String> keyFilters = new ArrayList<>();
private ConsumerEventListener eventListener = ConsumerEventListener.noop();
private int maxRetries;
private long baseBackoffMs;
private long maxBackoffMs;
ConsumerBuilder(DanubeClient client) {
this.client = client;
}
/**
* Sets the fully-qualified topic name. Required.
*
* @param topic e.g. {@code /default/my-topic}
*/
public ConsumerBuilder withTopic(String topic) {
this.topic = topic;
return this;
}
/**
* Sets the consumer name. Required. Used to identify this consumer on the broker.
*
* @param consumerName a unique identifier for this consumer
*/
public ConsumerBuilder withConsumerName(String consumerName) {
this.consumerName = consumerName;
return this;
}
/**
* Sets the subscription name. Required.
* Multiple consumers sharing the same subscription name form a subscription group.
*
* @param subscription the subscription name
*/
public ConsumerBuilder withSubscription(String subscription) {
this.subscription = subscription;
return this;
}
/**
* Sets the subscription type. Defaults to {@link SubType#SHARED}.
*
* @param subType {@link SubType#EXCLUSIVE}, {@link SubType#SHARED},
* {@link SubType#FAILOVER}, or {@link SubType#KEY_SHARED}
*/
public ConsumerBuilder withSubscriptionType(SubType subType) {
if (subType != null) {
this.subType = subType;
}
return this;
}
/**
* Adds a single key filter pattern for {@link SubType#KEY_SHARED} subscriptions.
* Uses glob syntax: {@code "user-*"}, {@code "eu-west-?"}, {@code "*"}.
*
* @param pattern the key filter pattern
*/
public ConsumerBuilder withKeyFilter(String pattern) {
if (pattern != null && !pattern.isBlank()) {
this.keyFilters.add(pattern);
}
return this;
}
/**
* Adds multiple key filter patterns for {@link SubType#KEY_SHARED} subscriptions.
*
* @param patterns the key filter patterns
*/
public ConsumerBuilder withKeyFilters(List<String> patterns) {
if (patterns != null) {
for (String p : patterns) {
if (p != null && !p.isBlank()) {
this.keyFilters.add(p);
}
}
}
return this;
}
/**
* Sets a listener for consumer lifecycle and message events.
*
* @param eventListener the listener; no-op by default
*/
public ConsumerBuilder withEventListener(ConsumerEventListener eventListener) {
if (eventListener != null) {
this.eventListener = eventListener;
}
return this;
}
/**
* Sets the maximum number of receive retries before giving up.
*
* @param maxRetries number of retries; 0 means use the client default
*/
public ConsumerBuilder withMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
return this;
}
/**
* Sets the base backoff duration between retries (milliseconds).
*
* @param baseBackoffMs base backoff in ms; 0 means use the client default
*/
public ConsumerBuilder withBaseBackoffMs(long baseBackoffMs) {
this.baseBackoffMs = baseBackoffMs;
return this;
}
/**
* Sets the maximum backoff duration between retries (milliseconds).
*
* @param maxBackoffMs max backoff in ms; 0 means use the client default
*/
public ConsumerBuilder withMaxBackoffMs(long maxBackoffMs) {
this.maxBackoffMs = maxBackoffMs;
return this;
}
public Consumer build() {
if (topic == null || topic.isBlank()) {
throw new DanubeClientException("Consumer topic is required");
}
if (consumerName == null || consumerName.isBlank()) {
throw new DanubeClientException("Consumer name is required");
}
if (subscription == null || subscription.isBlank()) {
throw new DanubeClientException("Subscription is required");
}
ConsumerOptions options = new ConsumerOptions(
topic, consumerName, subscription, subType,
List.copyOf(keyFilters),
eventListener,
maxRetries, baseBackoffMs, maxBackoffMs);
return new Consumer(client, options);
}
}