Skip to content

Commit f079566

Browse files
authored
Merge pull request #3 from danube-messaging/intro_nack
impl nack consumer response
2 parents 50f45ac + 2108b3c commit f079566

5 files changed

Lines changed: 228 additions & 9 deletions

File tree

danube-client-proto/src/main/proto/DanubeApi.proto

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ service ConsumerService {
6363

6464
// Acknowledges receipt of a message from the Consumer
6565
rpc Ack(AckRequest) returns (AckResponse);
66+
67+
// Negative acknowledgment for a message from the Consumer
68+
rpc Nack(NackRequest) returns (NackResponse);
6669
}
6770

6871
// Create Consumer request
@@ -136,6 +139,20 @@ message AckResponse {
136139
uint64 request_id = 1;
137140
}
138141

142+
message NackRequest {
143+
uint64 request_id = 1;
144+
// Identifies the message, associated with a unique topic, subscription and the broker
145+
MsgID msg_id = 2;
146+
// Subscription name the consumer is subscribed to
147+
string subscription_name = 3;
148+
optional uint64 delay_ms = 4;
149+
optional string reason = 5;
150+
}
151+
152+
message NackResponse {
153+
uint64 request_id = 1;
154+
}
155+
139156
// ============================================================================================
140157

141158
service Discovery {

danube-client/src/main/java/com/danubemessaging/client/Consumer.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,44 @@ public void ack(StreamMessage message) {
173173
notifyMessageAcked(topicConsumer, message);
174174
}
175175

176+
/**
177+
* Negatively acknowledges a message asynchronously.
178+
*
179+
* @param message the message to negatively acknowledge
180+
* @param delayMs optional redelivery delay in milliseconds; may be null
181+
* @param reason optional reason for the nack; may be null
182+
* @return a future that completes when the nack is sent
183+
*/
184+
public CompletableFuture<Void> nackAsync(StreamMessage message, Long delayMs, String reason) {
185+
return CompletableFuture.runAsync(() -> nack(message, delayMs, reason), client.ioExecutor());
186+
}
187+
188+
/**
189+
* Negatively acknowledges a message, signaling the broker that processing failed.
190+
* The broker may redeliver the message depending on the subscription's failure policy.
191+
*
192+
* @param message the message to negatively acknowledge; must not be null
193+
* @param delayMs optional redelivery delay in milliseconds; may be null
194+
* @param reason optional reason for the nack; may be null
195+
* @throws com.danubemessaging.client.errors.DanubeClientException if the message's topic
196+
* has no associated consumer or the consumer is closed
197+
*/
198+
public void nack(StreamMessage message, Long delayMs, String reason) {
199+
ensureOpen();
200+
201+
if (message == null) {
202+
throw new DanubeClientException("Message is required for nack");
203+
}
204+
205+
TopicConsumer topicConsumer = consumerByTopic.get(message.messageId().topicName());
206+
if (topicConsumer == null) {
207+
throw new DanubeClientException(
208+
"No consumer found for topic in message id: " + message.messageId().topicName());
209+
}
210+
211+
topicConsumer.nack(message, delayMs, reason);
212+
}
213+
176214
/**
177215
* Closes this consumer, cancels the receive loop, and releases all resources.
178216
* Idempotent — safe to call multiple times.

danube-client/src/main/java/com/danubemessaging/client/internal/consumer/TopicConsumer.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,61 @@ public void ack(StreamMessage message) {
229229
}
230230
}
231231

232+
public void nack(StreamMessage message, Long delayMs, String reason) {
233+
int attempts = 0;
234+
while (true) {
235+
ensureOpen();
236+
237+
if (consumerId == 0) {
238+
throw new DanubeClientException("Consumer is not subscribed for topic: " + topic);
239+
}
240+
241+
var connection = connectionManager.getConnection(brokerAddress.brokerUrl(), brokerAddress.connectUrl());
242+
var stub = ConsumerServiceGrpc.newBlockingStub(connection.grpcChannel())
243+
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata()));
244+
245+
var builder = DanubeApi.NackRequest.newBuilder()
246+
.setRequestId(requestId.incrementAndGet())
247+
.setMsgId(message.messageId().toProto())
248+
.setSubscriptionName(options.subscription());
249+
if (delayMs != null) {
250+
builder.setDelayMs(delayMs);
251+
}
252+
if (reason != null) {
253+
builder.setReason(reason);
254+
}
255+
DanubeApi.NackRequest request = builder.build();
256+
257+
try {
258+
stub.nack(request);
259+
return;
260+
} catch (RuntimeException error) {
261+
if (retryManager.isUnrecoverable(error)) {
262+
notifyError(error, true);
263+
relookupAndResubscribe();
264+
attempts = 0;
265+
continue;
266+
}
267+
268+
if (!retryManager.isRetryable(error)) {
269+
notifyError(error, false);
270+
throw new DanubeClientException("Failed to nack message for topic: " + topic, error);
271+
}
272+
273+
notifyError(error, true);
274+
275+
attempts++;
276+
if (attempts > retryManager.maxRetries()) {
277+
relookupAndResubscribe();
278+
attempts = 0;
279+
continue;
280+
}
281+
282+
sleepBackoff(retryManager.calculateBackoff(attempts - 1));
283+
}
284+
}
285+
}
286+
232287
public synchronized void relookupAndResubscribe() {
233288
ensureOpen();
234289
cancelHealthCheckTask();
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package com.danubemessaging.client.it;
2+
3+
import com.danubemessaging.client.Consumer;
4+
import com.danubemessaging.client.DanubeClient;
5+
import com.danubemessaging.client.DispatchStrategy;
6+
import com.danubemessaging.client.Producer;
7+
import com.danubemessaging.client.SubType;
8+
import com.danubemessaging.client.model.StreamMessage;
9+
import org.junit.jupiter.api.Test;
10+
11+
import java.util.Map;
12+
import java.util.concurrent.CountDownLatch;
13+
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.atomic.AtomicInteger;
15+
import java.util.concurrent.atomic.AtomicReference;
16+
17+
import static com.danubemessaging.client.it.TestHelpers.*;
18+
import static org.junit.jupiter.api.Assertions.*;
19+
20+
/**
21+
* Reliable nack tests: verify that nacking a message on a reliable-dispatch
22+
* topic causes the broker to redeliver the same message.
23+
*/
24+
class ReliableNackIT {
25+
26+
private void runReliableNack(String topicPrefix, SubType subType) throws Exception {
27+
DanubeClient client = newClient();
28+
String topic = uniqueTopic(topicPrefix);
29+
30+
Producer producer = client.newProducer()
31+
.withTopic(topic)
32+
.withName("producer_reliable_nack")
33+
.withDispatchStrategy(DispatchStrategy.RELIABLE)
34+
.build();
35+
producer.create();
36+
37+
Consumer consumer = client.newConsumer()
38+
.withTopic(topic)
39+
.withConsumerName("cons_rel_nack")
40+
.withSubscription("rel_sub_nack")
41+
.withSubscriptionType(subType)
42+
.build();
43+
consumer.subscribe();
44+
45+
try {
46+
byte[] payload = "nack-redelivery-test".getBytes();
47+
48+
AtomicReference<StreamMessage> firstDelivery = new AtomicReference<>();
49+
AtomicReference<StreamMessage> redelivery = new AtomicReference<>();
50+
AtomicReference<Throwable> error = new AtomicReference<>();
51+
AtomicInteger deliveryCount = new AtomicInteger();
52+
CountDownLatch done = new CountDownLatch(1);
53+
54+
consumer.receive().subscribe(new TestHelpers.MessageCollector(2) {
55+
@Override
56+
public void onNext(StreamMessage item) {
57+
try {
58+
int count = deliveryCount.incrementAndGet();
59+
if (count == 1) {
60+
// First delivery — nack it
61+
firstDelivery.set(item);
62+
assertArrayEquals(payload, item.payload(), "first delivery payload mismatch");
63+
consumer.nack(item, 0L, "testing nack redelivery");
64+
} else if (count == 2) {
65+
// Redelivered message — verify and ack
66+
redelivery.set(item);
67+
assertArrayEquals(payload, item.payload(), "redelivered payload mismatch");
68+
assertEquals(firstDelivery.get().messageId().topicOffset(),
69+
item.messageId().topicOffset(),
70+
"redelivered message should have same offset");
71+
consumer.ack(item);
72+
done.countDown();
73+
}
74+
} catch (Throwable t) {
75+
error.set(t);
76+
done.countDown();
77+
}
78+
}
79+
});
80+
81+
Thread.sleep(400);
82+
83+
producer.send(payload, Map.of());
84+
85+
assertTrue(done.await(15, TimeUnit.SECONDS),
86+
"Timeout: received " + deliveryCount.get() + "/2 deliveries");
87+
88+
if (error.get() != null) {
89+
fail("Assertion failed in subscriber: " + error.get().getMessage());
90+
}
91+
92+
assertNotNull(firstDelivery.get(), "first delivery should not be null");
93+
assertNotNull(redelivery.get(), "redelivery should not be null");
94+
} finally {
95+
consumer.close();
96+
client.close();
97+
}
98+
}
99+
100+
@Test
101+
void reliableNackExclusive() throws Exception {
102+
runReliableNack("/default/reliable_nack_exclusive", SubType.EXCLUSIVE);
103+
}
104+
105+
@Test
106+
void reliableNackShared() throws Exception {
107+
runReliableNack("/default/reliable_nack_shared", SubType.SHARED);
108+
}
109+
}

docker/danube_broker.yml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ auto_create_topics: true
3131
# Security Configuration
3232
auth:
3333
mode: none # Options: none, tls, tlswithjwt
34-
tls:
35-
cert_file: "./cert/server-cert.pem"
36-
key_file: "./cert/server-key.pem"
37-
ca_file: "./cert/ca-cert.pem"
38-
verify_client: false
39-
jwt:
40-
secret_key: "your-secret-key"
41-
issuer: "danube-auth"
42-
expiration_time: 3600 # in seconds
34+
# tls:
35+
# cert_file: "./cert/server-cert.pem"
36+
# key_file: "./cert/server-key.pem"
37+
# ca_file: "./cert/ca-cert.pem"
38+
# verify_client: false
39+
# jwt:
40+
# secret_key: "your-secret-key"
41+
# issuer: "danube-auth"
42+
# expiration_time: 3600 # in seconds
4343

4444
# Load Manager Configuration (Automated Proactive Rebalancing)
4545
load_manager:

0 commit comments

Comments
 (0)