Skip to content

[feat][broker] PIP-475: synthetic-layout lookup for regular topics#25822

Open
merlimat wants to merge 2 commits into
apache:masterfrom
merlimat:pip-475-impl-1
Open

[feat][broker] PIP-475: synthetic-layout lookup for regular topics#25822
merlimat wants to merge 2 commits into
apache:masterfrom
merlimat:pip-475-impl-1

Conversation

@merlimat
Copy link
Copy Markdown
Contributor

Summary

First implementation PR for PIP-475: Regular-to-Scalable Topic Migration. Lays the broker-side foundation: the scalable-topic lookup session now accepts persistent:// and short-form topic names in addition to topic://, and synthesises a layout that wraps an existing regular (partitioned or non-partitioned) topic as one or more special segments. V5 clients can then operate against any topic through the scalable surface, even before it has been migrated — the SDK consumption of the synthetic layout will land in a follow-up PR.

Wire-format additions

  • SegmentInfoProto.underlying_topic_name — when set, the segment wraps the named persistent://... topic instead of having its own segment://... URI.
  • CommandScalableTopicUpdate.resolved_topic_name — the canonical topic://t/n/x identity, set on every successful response so a client that looked up via persistent://... or short-form learns the resolved name.

Java surface

  • SegmentInfo.underlyingTopicName + activeSpecial(...) factory + isSpecial() check.
  • TopicName.toScalableTopic() — derive the canonical scalable identity from a name in any input domain.

Broker behaviour

  • ServerCnx rejects non-persistent:// scalable-topic lookups early.
  • DagWatchSession.start() falls back to a synthetic layout for persistent:// input when no scalable metadata exists at the canonical path:
    • Non-partitioned → 1 special segment covering the full hash range.
    • Partitioned with N partitions → N special segments wrapping each persistent://...-partition-K with equal-width contiguous hash ranges.
  • The metadata-store watch remains active so a later migration that writes scalable metadata to the same path is observed and transparently replaces the synthetic layout.
  • pushUpdate emits the canonical topic://... resolvedTopicName, plus the new special-segment marker when set.

Test plan

  • DagWatchSessionTest covers synthetic layouts for both non-partitioned and partitioned regular topics, and verifies the wire response carries resolved_topic_name and underlying_topic_name (17/17 passing).
  • CommandsScalableTopicTest verifies resolved_topic_name round-trips through the new Commands.newScalableTopicUpdate signature (8/8 passing).
  • ScalableTopicControllerTest regression check (33/33 passing).
  • End-to-end V5 SDK test against synthetic layouts — lands with the SDK consumption PR.

Adds the broker-side foundation for PIP-475 (regular-to-scalable topic
migration). The scalable-topic lookup session now accepts persistent://
and short-form topic names in addition to topic://, and synthesises a
layout that wraps an existing regular (partitioned or non-partitioned)
topic as one or more special segments. V5 clients can then operate
against any topic through the scalable surface, even before it has been
migrated.

Schema additions:
 * SegmentInfoProto.underlying_topic_name — when set, the segment wraps
   the named persistent://... topic instead of having its own
   segment://... URI. Used by the synthetic-layout response.
 * CommandScalableTopicUpdate.resolved_topic_name — the canonical
   topic://t/n/x identity, set on every successful response so a
   client that looked up via persistent://... or short-form learns
   the resolved name.

Java surface mirrors the wire additions on SegmentInfo (with a new
activeSpecial(...) factory and isSpecial() check) and a new
TopicName.toScalableTopic() helper used by the broker to derive the
canonical identity regardless of input form.

Broker behaviour:
 * ServerCnx rejects non-persistent:// scalable-topic lookups early.
 * DagWatchSession.start() falls back to a synthetic layout for
   persistent:// input when no scalable metadata exists at the
   canonical path. Non-partitioned → 1 special segment covering the
   full hash range; partitioned with N partitions → N special segments
   wrapping each persistent://...-partition-K with equal-width
   contiguous hash ranges. The metadata-store watch remains active so
   a later migration that writes scalable metadata to the same path
   is observed and transparently replaces the synthetic layout.
 * pushUpdate emits the canonical topic://... resolvedTopicName, plus
   the new special-segment marker when set.

Tests:
 * DagWatchSessionTest covers synthetic layouts for both
   non-partitioned and partitioned regular topics, and verifies the
   wire response carries resolved_topic_name and underlying_topic_name.
 * CommandsScalableTopicTest verifies resolved_topic_name round-trips
   through the new Commands.newScalableTopicUpdate signature.
@lhotari lhotari self-requested a review May 20, 2026 16:02
@lhotari
Copy link
Copy Markdown
Member

lhotari commented May 20, 2026

These are findings from a local Claude Code review. Please check before merging — they are suggestions, not blockers, and some may be intentional design choices.

Findings

  1. [BUG] toScalableTopic() doesn't strip the -partition-K suffixpulsar-common/.../TopicName.java
    getEncodedLocalName() returns the full local name including a partition suffix. A lookup against persistent://t/n/x-partition-3 resolves to topic://t/n/x-partition-3 rather than the canonical topic://t/n/x. The PR description and Javadoc promise "the canonical topic://t/n/x identity", so either the contract is wrong or the implementation is. Suggested fix: use TopicName.get(getPartitionedTopicName()).getLocalName() (or short-circuit on isPartitioned()) before building the new name.

  2. [BUG] Partitioned-input synthetic layout chains partition suffixesDagWatchSession#buildSyntheticResponse
    topicName.getPartition(k) only short-circuits when the existing suffix matches -partition-K for the same index K. If a client supplies persistent://t/n/x-partition-0, getPartition(1) produces persistent://t/n/x-partition-0-partition-1 (a non-existent topic). Combined with javascript client #1, partition-specific inputs are accepted but produce nonsense underlying-topic names. Either reject inputs where topicName.isPartitioned() early, or normalize to the base partitioned-topic name before calling getPartition(k).

  3. [BUG] HashRange validation throws when partitions > 65536DagWatchSession#buildSyntheticResponse
    int width = 0x10000 / partitions; becomes 0 once partitions exceed the 16-bit hash space, and the next line computes end = start + width - 1 = -1, which fails HashRange's start <= end invariant and surfaces as an IllegalArgumentException rather than a clean lookup error. Practical exposure is low (broker maxNumPartitionsPerPartitionedTopic is typically smaller), but the failure mode is ugly. Either cap partitions or assign every range as a degenerate single-hash slot (which works since "routing is mod-N over segment_id" anyway, per the existing comment).

  4. [QUALITY] Synthetic layout is built for topics that don't existDagWatchSession#buildSyntheticResponse
    fetchPartitionedTopicMetadataAsync returns PartitionedTopicMetadata(0) for both non-partitioned topics and topics that don't exist. The pre-PR behavior was to fail with IllegalStateException(\"Scalable topic not found: ...\"); the new behavior is to return a synthetic single-segment layout pointing at a possibly non-existent persistent://... topic. This may be intentional (auto-create on connect downstream), but it's a real behavior change. A quick checkTopicExists/namespaceExists guard for the non-partitioned case would tighten this; at minimum the Javadoc should call it out.

  5. [QUALITY] SegmentInfo.isSpecial() accepts empty string
    underlyingTopicName \!= null returns true for the empty string. Factory methods guard against this, but the record's canonical constructor is public; one stray \"\" would silently break the special/regular distinction. Cheap to harden: return underlyingTopicName \!= null && \!underlyingTopicName.isEmpty(); and/or reject blanks in the canonical constructor.

  6. [QUALITY] Commands.newScalableTopicUpdate doesn't handle a null resolvedTopicName
    Lightproto's setResolvedTopicName(null) will NPE rather than leave the field unset. Today all callers pass a non-null value, but the signature accepts String without @Nonnull and the proto field is optional. Either annotate the param or guard the setter (if (resolvedTopicName \!= null) ...).

  7. [QUALITY] Cosmetic hash ranges in synthetic partitioned layout
    The comment correctly notes ranges are "cosmetic — routing for synthetic layouts is mod-N over segment_id". That contract lives entirely in this method plus a future SDK PR. Worth a // see PIP-475 §X link or shared constant so broker- and client-side conventions don't drift. Specifically: if the SDK ever consults hash_start/hash_end for synthetic layouts, partition counts that don't divide 0x10000 evenly will misroute by one bucket per cycle.

  8. [QUALITY] V1 namespace handling in toScalableTopic() not asserted
    For a V1-style name like persistent://tenant/cluster/ns/x, getNamespace() returns the 3-segment form tenant/cluster/ns, producing topic://tenant/cluster/ns/x. TopicName.get() should round-trip that, but there are no tests exercising it. A one-line unit test (or an explicit guard against V1) would prevent surprise.

  9. [QUALITY] pushUpdate race with watch-fired updates (pre-existing, but widened here)
    The watch is registered before the initial metadata fetch, so an onNotification for the canonical path can deliver a pushUpdate(...) to the client before ServerCnx finishes wiring up start().thenAcceptAsync(session::pushUpdate, ...). Not introduced by this PR, but the new synthetic-layout path widens the window because the initial response now does an extra fetchPartitionedTopicMetadataAsync round-trip. Worth tracking even if deferred.

  10. [INTENT MISMATCH — minor] Short-form input not exercised by tests
    The PR description claims short-form topic names are accepted. That works because TopicName.get() already normalizes short forms to persistent://public/default/<name>, but the PR itself adds no test for it. Consider one unit test asserting that a short-form input yields the expected resolvedTopicName.

Non-issues / nits

  • Proto field number 11 for underlying_topic_name is the next free number — no conflict.
  • LinkedHashMap for segments preserves insertion order so partition-K → segment-K mapping is stable on the wire.
  • The watch-replaces-synthetic mechanism is correct: onMetadataChanged routes through buildResponse(metadata) (the real path), so a later migration transparently swaps the synthetic layout.
  • ServerCnx non-persistent rejection happens before isRunning/resources checks — fine, the early reject leaks no information that isn't already public.

Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Please check the local Claude Code review findings before merging.

* @param sealedAtEpoch DAG epoch when sealed (-1 if still active)
* @param createdAtMs wall-clock millis at creation time
* @param sealedAtMs wall-clock millis at seal time (-1 if still active)
* <p>A segment may be a <i>special segment</i> — one that wraps an existing
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we call "special segment" something else? "special" can be vague and having a more meaningful concept would help in naming of other fields and methods. Perhaps "adaption segment", "adaptor segment", "adapted segment", "migration segment", "migrated segment", etc. to capture the meaning?

For underlyingTopicName, perhaps it could be "adaptedV4TopicName" ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call — renamed throughout. I went with "legacy segment" and legacyTopicName: a legacy segment is one that is not managed by the scalable-topic controller and has no segment://... URI of its own — it wraps an existing, externally managed persistent://... topic. The Javadoc spells out that externally-managed framing. Applied to the proto field (underlying_topic_namelegacy_topic_name), SegmentInfo (activeSpecialactiveLegacy, isSpecialisLegacy), and the V5 SDK in the follow-up PR. Pushed in 74fab13.

…t lookup

Rename + bug fixes from the PR apache#25822 review.

Rename "special segment" → "legacy segment":
 * The concept was vague. A legacy segment is one not managed by the
   scalable-topic controller — it wraps an existing, externally managed
   persistent:// topic. Renamed the proto field
   (underlying_topic_name → legacy_topic_name), SegmentInfo
   (underlyingTopicName → legacyTopicName, activeSpecial → activeLegacy,
   isSpecial → isLegacy), and all comments.

Bug fixes:
 * TopicName.toScalableTopic() now strips any -partition-K suffix, so
   persistent://t/n/x-partition-3 resolves to topic://t/n/x rather than
   topic://t/n/x-partition-3.
 * DagWatchSession.start() rejects an individual-partition lookup target
   up front (the synthetic layout models the whole partitioned topic;
   wrapping one partition would otherwise produce nonsensical
   -partition-K-partition-J names).
 * Synthetic partitioned layout no longer throws when partitions exceeds
   the 16-bit hash space (width would be 0 → HashRange invariant
   violation). Ranges are cosmetic for synthetic layouts (routing is
   mod-N over segment_id), so a degenerate clamped slot is used past
   65536 partitions.
 * SegmentInfo.isLegacy() rejects an empty legacyTopicName, not just null.
 * Commands.newScalableTopicUpdate guards a null resolvedTopicName (the
   field is optional; the lightproto setter would otherwise NPE).

Docs/tests:
 * Documented that a synthetic layout is built even for a non-existent
   topic — existence/auto-create is decided downstream by the namespace
   policy when the producer/consumer attaches, matching v4.
 * Documented that synthetic hash ranges are cosmetic (mod-N routing).
 * Added TopicNameTest.testToScalableTopic (topic://, persistent://,
   short-form, and -partition-K stripping — also exercises short-form
   input per the review).
 * Added DagWatchSessionTest.testStartRejectsIndividualPartitionInput.
 * Added CommandsScalableTopicTest coverage for the null-resolved-name
   guard.

Confirmed non-issue: V1 names (with a cluster component) are rejected by
TopicName.get() before toScalableTopic() can see them.
@merlimat
Copy link
Copy Markdown
Contributor Author

Thanks for the thorough review @lhotari. Addressed in 74fab13:

Naming — renamed "special segment" → "legacy segment" (legacyTopicName, isLegacy()), per the inline thread. A legacy segment is one not managed by the scalable-topic controller; it wraps an existing, externally-managed persistent:// topic.

Bugs

  1. toScalableTopic() partition suffix — now strips -partition-K (via getPartitionedTopicName()), so persistent://t/n/x-partition-3 resolves to topic://t/n/x. Covered by TopicNameTest.testToScalableTopic.
  2. Partition-name input chainingDagWatchSession.start() now rejects an individual-partition lookup target up front. Covered by testStartRejectsIndividualPartitionInput.
  3. partitions > 65536 — synthetic ranges are cosmetic (routing is mod-N over segment_id), so past the 16-bit hash space a degenerate clamped slot is used instead of letting width hit 0 and violating the HashRange invariant.
  4. isLegacy() empty string — now != null && !isEmpty() (both broker and SDK side).
  5. newScalableTopicUpdate(null, ...) — guards the optional resolved_topic_name setter against null. Covered by a new CommandsScalableTopicTest case.

Quality / docs
4. Synthetic layout for non-existent topics — confirmed intentional and documented: existence/auto-create is decided downstream by the namespace policy when the producer/consumer attaches, matching v4 semantics.
7. Cosmetic hash ranges — documented that synthetic ranges are never consulted for routing (mod-N over segment_id).
10. Short-form input — now exercised in testToScalableTopic.

Confirmed non-issue
8. V1 namesTopicName.get() rejects V1 names (with a cluster component) before toScalableTopic() can see them; existing testToFullTopicName already asserts this.

Deferred
9. pushUpdate race — pre-existing, not introduced here; tracking separately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants