Skip to content

[feat] PIP-468: V5 dead letter queue with scalable DLQ topic#25652

Merged
merlimat merged 2 commits intoapache:masterfrom
merlimat:st-v5-dlq
May 2, 2026
Merged

[feat] PIP-468: V5 dead letter queue with scalable DLQ topic#25652
merlimat merged 2 commits intoapache:masterfrom
merlimat:st-v5-dlq

Conversation

@merlimat
Copy link
Copy Markdown
Contributor

@merlimat merlimat commented May 2, 2026

Summary

Move DLQ ownership from v4 per-segment ConsumerImpls up to the V5 ScalableQueueConsumer so that:

  • A single DLQ producer fans messages from every source segment, instead of one producer per segment.
  • The DLQ topic itself can be a scalable (topic://) topic. The v4 ConsumerImpl creates its DLQ producer via client.newProducer(...), which rejects the topic:// domain — V5 now uses its own producer builder, so a scalable DLQ "just works".
  • The default DLQ topic name (when the policy doesn't set one) is topic://<tenant>/<ns>/<source-local>-DLQ.

ScalableTopicProducer's send path is now fully async: segment producers are stored as CompletableFuture so callers running on a netty IO thread (e.g. the V5 DLQ dispatch off a v4 receive callback) can chain instead of blocking on .get(). A per-segment dispatch chain serializes v4 sendAsync calls in user-call order; flushAsync awaits the snapshot of in-flight send futures.

Test plan

  • V5DeadLetterPolicyTest:
    • testMessageGoesToScalableDlqWhenExplicitlyConfigured — explicit scalable DLQ target.
    • testMessageGoesToDefaultScalableDlqTopic — default DLQ name (<source>-DLQ).
    • testDlqMessagePreservesKeyPropertiesAndOriginMetadata — verifies key, user properties, eventTime, and origin metadata (REAL_TOPIC / REAL_SUBSCRIPTION / ORIGIN_MESSAGE_ID) are attached.
    • testDlqAcrossMultipleSourceSegments — 3-segment source, 6 keys, single shared V5-side DLQ producer.
  • Full V5 test suite (org.apache.pulsar.client.api.v5.*) green: 113/113 pass.
  • pulsar-client-v5 and pulsar-broker checkstyle clean.

Move DLQ ownership from v4 per-segment ConsumerImpls up to the V5
ScalableQueueConsumer so that:

- A single DLQ producer fans messages from every source segment,
  rather than one producer per segment.
- The DLQ topic itself can be a scalable (topic://) topic. The v4
  ConsumerImpl creates its DLQ producer via client.newProducer(...),
  which rejects the topic:// domain — V5 now uses its own producer
  builder, so a scalable DLQ "just works".
- The default DLQ topic name (when the policy doesn't set one) is
  topic://<tenant>/<ns>/<source-local>-DLQ.

ScalableTopicProducer's send path is now fully async: segment
producers are stored as CompletableFuture so callers running on a
netty IO thread (e.g. the V5 DLQ dispatch off a v4 receive callback)
can chain instead of blocking on .get(). A per-segment dispatch chain
serializes v4 sendAsync calls in user-call order; flushAsync awaits
the snapshot of in-flight send futures.

Tests cover explicit + default-named scalable DLQ targets, origin
metadata preservation (REAL_TOPIC / REAL_SUBSCRIPTION /
ORIGIN_MESSAGE_ID + key + properties + eventTime), and DLQ forwarding
across multiple source segments.
@merlimat merlimat changed the title PIP-468: V5 dead letter queue with scalable DLQ topic [feat] PIP-468: V5 dead letter queue with scalable DLQ topic May 2, 2026
Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

# Conflicts:
#	pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
#	pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@merlimat merlimat merged commit ebec5ce into apache:master May 2, 2026
43 checks passed
@merlimat merlimat deleted the st-v5-dlq branch May 2, 2026 16:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants