Skip to content

Commit ebec5ce

Browse files
authored
[feat] PIP-468: V5 dead letter queue with scalable DLQ topic (#25652)
1 parent 3e27cc6 commit ebec5ce

5 files changed

Lines changed: 505 additions & 157 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java

Lines changed: 178 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,31 +20,34 @@
2020

2121
import static org.testng.Assert.assertEquals;
2222
import static org.testng.Assert.assertNotNull;
23+
import static org.testng.Assert.assertTrue;
2324
import java.time.Duration;
25+
import java.time.Instant;
26+
import java.util.HashSet;
27+
import java.util.Map;
28+
import java.util.Set;
2429
import lombok.Cleanup;
2530
import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
2631
import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
2732
import org.apache.pulsar.client.api.v5.schema.Schema;
33+
import org.apache.pulsar.client.util.RetryMessageUtil;
2834
import org.testng.annotations.Test;
2935

3036
/**
3137
* Coverage for {@link QueueConsumerBuilder#deadLetterPolicy(DeadLetterPolicy)}: after
32-
* {@code maxRedeliverCount} negative-acks, the broker forwards the message to the
38+
* {@code maxRedeliverCount} negative-acks, the V5 layer forwards the message to the
3339
* configured dead-letter topic.
3440
*
35-
* <p><b>Known V5 gap:</b> the DLQ topic is currently a non-scalable persistent topic.
36-
* The V5 source consumer's underlying v4 {@code ConsumerImpl} creates the DLQ producer
37-
* via {@code client.newProducer(...)}, which rejects {@code topic://} scalable topic
38-
* names. Routing the DLQ producer through V5's segment-bypass path is required before
39-
* a scalable DLQ can be used here.
41+
* <p>V5 owns the DLQ at the consumer layer (not per-segment), so the DLQ topic is
42+
* itself a scalable {@code topic://} topic and a single shared producer fans messages
43+
* from every segment into it.
4044
*/
4145
public class V5DeadLetterPolicyTest extends V5ClientBaseTest {
4246

4347
@Test
44-
public void testMessageGoesToDlqAfterMaxRedeliveries() throws Exception {
48+
public void testMessageGoesToScalableDlqWhenExplicitlyConfigured() throws Exception {
4549
String topic = newScalableTopic(1);
46-
// Non-scalable DLQ topic — see class-level note about the V5 gap.
47-
String dlqTopic = "persistent://" + getNamespace() + "/dlq-explicit";
50+
String dlqTopic = newScalableTopic(1);
4851

4952
@Cleanup
5053
Producer<String> producer = v5Client.newProducer(Schema.string())
@@ -59,14 +62,10 @@ public void testMessageGoesToDlqAfterMaxRedeliveries() throws Exception {
5962
.deadLetterPolicy(new DeadLetterPolicy(2, null, dlqTopic, null))
6063
.subscribe();
6164

62-
// Subscribe to the DLQ via the V4 client (DLQ is a regular persistent topic).
6365
@Cleanup
64-
org.apache.pulsar.client.api.Consumer<String> dlqConsumer = pulsarClient
65-
.newConsumer(org.apache.pulsar.client.api.Schema.STRING)
66+
QueueConsumer<byte[]> dlqConsumer = v5Client.newQueueConsumer(Schema.bytes())
6667
.topic(dlqTopic)
6768
.subscriptionName("dlq-watcher")
68-
.subscriptionInitialPosition(
69-
org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest)
7069
.subscribe();
7170

7271
producer.newMessage().value("dead").send();
@@ -81,11 +80,171 @@ public void testMessageGoesToDlqAfterMaxRedeliveries() throws Exception {
8180
consumer.negativeAcknowledge(msg.id());
8281
}
8382

84-
// Now it should appear on the DLQ topic.
85-
org.apache.pulsar.client.api.Message<String> dlqMsg =
86-
dlqConsumer.receive(10, java.util.concurrent.TimeUnit.SECONDS);
83+
Message<byte[]> dlqMsg = dlqConsumer.receive(Duration.ofSeconds(10));
8784
assertNotNull(dlqMsg, "message did not land on the DLQ topic");
88-
assertEquals(dlqMsg.getValue(), "dead");
89-
dlqConsumer.acknowledge(dlqMsg);
85+
assertEquals(new String(dlqMsg.value()), "dead");
86+
dlqConsumer.acknowledge(dlqMsg.id());
87+
}
88+
89+
@Test
90+
public void testMessageGoesToDefaultScalableDlqTopic() throws Exception {
91+
String topic = newScalableTopic(1);
92+
// When deadLetterTopic is null, V5 defaults to topic://<tenant>/<ns>/<source-local>-DLQ.
93+
// Pre-create it so the V5 producer's layout lookup succeeds.
94+
String defaultDlqTopic = topic + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
95+
admin.scalableTopics().createScalableTopic(defaultDlqTopic, 1);
96+
97+
@Cleanup
98+
Producer<String> producer = v5Client.newProducer(Schema.string())
99+
.topic(topic)
100+
.create();
101+
@Cleanup
102+
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
103+
.topic(topic)
104+
.subscriptionName("default-dlq-sub")
105+
.negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
106+
Duration.ofMillis(200), Duration.ofMillis(200)))
107+
.deadLetterPolicy(DeadLetterPolicy.of(1))
108+
.subscribe();
109+
110+
@Cleanup
111+
QueueConsumer<byte[]> dlqConsumer = v5Client.newQueueConsumer(Schema.bytes())
112+
.topic(defaultDlqTopic)
113+
.subscriptionName("dlq-watcher")
114+
.subscribe();
115+
116+
producer.newMessage().value("dead-default").send();
117+
118+
// maxRedeliverCount=1 → user sees deliveries with counts 0 and 1, the
119+
// third delivery (count=2) is intercepted.
120+
for (int i = 0; i < 2; i++) {
121+
Message<String> msg = consumer.receive(Duration.ofSeconds(5));
122+
assertNotNull(msg, "expected delivery #" + (i + 1));
123+
assertEquals(msg.value(), "dead-default");
124+
consumer.negativeAcknowledge(msg.id());
125+
}
126+
127+
Message<byte[]> dlqMsg = dlqConsumer.receive(Duration.ofSeconds(10));
128+
assertNotNull(dlqMsg, "message did not land on the default DLQ topic");
129+
assertEquals(new String(dlqMsg.value()), "dead-default");
130+
dlqConsumer.acknowledge(dlqMsg.id());
131+
}
132+
133+
/**
134+
* The V5 DLQ producer must preserve message key, user properties, and event time,
135+
* and attach origin metadata (REAL_TOPIC / REAL_SUBSCRIPTION / ORIGIN_MESSAGE_ID)
136+
* so DLQ consumers can correlate back to the source delivery.
137+
*/
138+
@Test
139+
public void testDlqMessagePreservesKeyPropertiesAndOriginMetadata() throws Exception {
140+
String topic = newScalableTopic(1);
141+
String dlqTopic = newScalableTopic(1);
142+
143+
@Cleanup
144+
Producer<String> producer = v5Client.newProducer(Schema.string())
145+
.topic(topic)
146+
.create();
147+
@Cleanup
148+
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
149+
.topic(topic)
150+
.subscriptionName("dlq-meta-sub")
151+
.negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
152+
Duration.ofMillis(200), Duration.ofMillis(200)))
153+
.deadLetterPolicy(new DeadLetterPolicy(1, null, dlqTopic, null))
154+
.subscribe();
155+
@Cleanup
156+
QueueConsumer<byte[]> dlqConsumer = v5Client.newQueueConsumer(Schema.bytes())
157+
.topic(dlqTopic)
158+
.subscriptionName("dlq-meta-watcher")
159+
.subscribe();
160+
161+
long eventTime = System.currentTimeMillis();
162+
producer.newMessage()
163+
.key("k-42")
164+
.value("payload")
165+
.eventTime(Instant.ofEpochMilli(eventTime))
166+
.property("user-prop", "user-val")
167+
.send();
168+
169+
for (int i = 0; i < 2; i++) {
170+
Message<String> msg = consumer.receive(Duration.ofSeconds(5));
171+
assertNotNull(msg);
172+
consumer.negativeAcknowledge(msg.id());
173+
}
174+
175+
Message<byte[]> dlqMsg = dlqConsumer.receive(Duration.ofSeconds(10));
176+
assertNotNull(dlqMsg, "message did not land on the DLQ topic");
177+
assertEquals(new String(dlqMsg.value()), "payload");
178+
assertEquals(dlqMsg.key().orElse(null), "k-42");
179+
assertEquals(dlqMsg.eventTime().map(Instant::toEpochMilli).orElse(-1L).longValue(),
180+
eventTime);
181+
182+
Map<String, String> props = dlqMsg.properties();
183+
assertEquals(props.get("user-prop"), "user-val", "user properties must be preserved");
184+
assertEquals(props.get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
185+
assertEquals(props.get(RetryMessageUtil.SYSTEM_PROPERTY_REAL_SUBSCRIPTION), "dlq-meta-sub");
186+
assertNotNull(props.get(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID),
187+
"origin message id must be attached");
188+
assertTrue(props.get(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID).matches("\\d+:\\d+(:.+)?"),
189+
"origin message id should look like a v4 message id, got: "
190+
+ props.get(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID));
191+
192+
dlqConsumer.acknowledge(dlqMsg.id());
193+
}
194+
195+
/**
196+
* Verifies a single V5-side DLQ producer handles forwarding from messages that
197+
* arrived on different source segments — i.e., the V5 layer (not the v4
198+
* per-segment {@code ConsumerImpl}) owns the DLQ producer.
199+
*/
200+
@Test
201+
public void testDlqAcrossMultipleSourceSegments() throws Exception {
202+
String topic = newScalableTopic(3);
203+
String dlqTopic = newScalableTopic(1);
204+
205+
@Cleanup
206+
Producer<String> producer = v5Client.newProducer(Schema.string())
207+
.topic(topic)
208+
.create();
209+
@Cleanup
210+
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
211+
.topic(topic)
212+
.subscriptionName("dlq-multi-sub")
213+
.negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
214+
Duration.ofMillis(200), Duration.ofMillis(200)))
215+
.deadLetterPolicy(new DeadLetterPolicy(1, null, dlqTopic, null))
216+
.subscribe();
217+
@Cleanup
218+
QueueConsumer<byte[]> dlqConsumer = v5Client.newQueueConsumer(Schema.bytes())
219+
.topic(dlqTopic)
220+
.subscriptionName("dlq-multi-watcher")
221+
.subscribe();
222+
223+
// 6 keys → spread across the 3 segments.
224+
int n = 6;
225+
for (int i = 0; i < n; i++) {
226+
producer.newMessage().key("k-" + i).value("v-" + i).send();
227+
}
228+
229+
// For each source delivery (counts 0 and 1), nack everything; the third
230+
// delivery (count=2) of each message is intercepted and DLQ-forwarded.
231+
for (int round = 0; round < 2; round++) {
232+
for (int i = 0; i < n; i++) {
233+
Message<String> msg = consumer.receive(Duration.ofSeconds(5));
234+
assertNotNull(msg, "round " + round + " expected delivery #" + (i + 1));
235+
consumer.negativeAcknowledge(msg.id());
236+
}
237+
}
238+
239+
Set<String> dlqValues = new HashSet<>();
240+
for (int i = 0; i < n; i++) {
241+
Message<byte[]> dlqMsg = dlqConsumer.receive(Duration.ofSeconds(10));
242+
assertNotNull(dlqMsg, "expected DLQ message #" + (i + 1));
243+
dlqValues.add(new String(dlqMsg.value()));
244+
dlqConsumer.acknowledge(dlqMsg.id());
245+
}
246+
for (int i = 0; i < n; i++) {
247+
assertTrue(dlqValues.contains("v-" + i), "missing DLQ value v-" + i);
248+
}
90249
}
91250
}

pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ private CompletableFuture<Void> openTopic(String topicName, boolean retry) {
171171
};
172172
return dagWatch.start()
173173
.thenCompose(layout -> ScalableQueueConsumer.createAsyncImpl(
174-
client, v5Schema, perTopicConf(topicName), dagWatch, layout, sink))
174+
client, v5Schema, perTopicConf(topicName), dagWatch, layout, sink, null))
175175
.thenAccept(qc -> {
176176
if (closed) {
177177
qc.closeAsync();

pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ final class QueueConsumerBuilderV5<T> implements QueueConsumerBuilder<T> {
4848
private String topicName;
4949
private NamespaceName namespaceName;
5050
private Map<String, String> propertyFilters;
51+
/**
52+
* V5-layer DLQ policy. Held here rather than translated into {@code conf} so the
53+
* V5 {@link ScalableQueueConsumer} can own a single DLQ producer (instead of v4's
54+
* one-per-segment design, which would also reject {@code topic://} scalable DLQ
55+
* targets).
56+
*/
57+
private DeadLetterPolicy dlqPolicy;
5158

5259
QueueConsumerBuilderV5(PulsarClientV5 client, Schema<T> v5Schema) {
5360
this.client = client;
@@ -88,7 +95,7 @@ public CompletableFuture<QueueConsumer<T>> subscribeAsync() {
8895
DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
8996
return dagWatch.start()
9097
.thenCompose(initialLayout -> ScalableQueueConsumer.createAsync(
91-
client, v5Schema, conf, dagWatch, initialLayout));
98+
client, v5Schema, conf, dagWatch, initialLayout, dlqPolicy));
9299
}
93100

94101
@Override
@@ -190,18 +197,10 @@ public QueueConsumerBuilderV5<T> negativeAckRedeliveryBackoff(BackoffPolicy back
190197

191198
@Override
192199
public QueueConsumerBuilderV5<T> deadLetterPolicy(DeadLetterPolicy policy) {
193-
var builder = org.apache.pulsar.client.api.DeadLetterPolicy.builder()
194-
.maxRedeliverCount(policy.maxRedeliverCount());
195-
if (policy.retryLetterTopic() != null) {
196-
builder.retryLetterTopic(policy.retryLetterTopic());
197-
}
198-
if (policy.deadLetterTopic() != null) {
199-
builder.deadLetterTopic(policy.deadLetterTopic());
200-
}
201-
if (policy.initialSubscriptionName() != null) {
202-
builder.initialSubscriptionName(policy.initialSubscriptionName());
203-
}
204-
conf.setDeadLetterPolicy(builder.build());
200+
// Don't translate into conf — V5 owns DLQ at the consumer layer (see field
201+
// javadoc). The v4 per-segment DLQ would (a) duplicate producers, and
202+
// (b) reject scalable DLQ topics.
203+
this.dlqPolicy = policy;
205204
return this;
206205
}
207206

0 commit comments

Comments
 (0)