[feat][broker] PIP-475: synthetic-layout lookup for regular topics#25822
[feat][broker] PIP-475: synthetic-layout lookup for regular topics#25822merlimat wants to merge 2 commits into
Conversation
Adds the broker-side foundation for PIP-475 (regular-to-scalable topic migration). The scalable-topic lookup session now accepts persistent:// and short-form topic names in addition to topic://, and synthesises a layout that wraps an existing regular (partitioned or non-partitioned) topic as one or more special segments. V5 clients can then operate against any topic through the scalable surface, even before it has been migrated. Schema additions: * SegmentInfoProto.underlying_topic_name — when set, the segment wraps the named persistent://... topic instead of having its own segment://... URI. Used by the synthetic-layout response. * CommandScalableTopicUpdate.resolved_topic_name — the canonical topic://t/n/x identity, set on every successful response so a client that looked up via persistent://... or short-form learns the resolved name. Java surface mirrors the wire additions on SegmentInfo (with a new activeSpecial(...) factory and isSpecial() check) and a new TopicName.toScalableTopic() helper used by the broker to derive the canonical identity regardless of input form. Broker behaviour: * ServerCnx rejects non-persistent:// scalable-topic lookups early. * DagWatchSession.start() falls back to a synthetic layout for persistent:// input when no scalable metadata exists at the canonical path. Non-partitioned → 1 special segment covering the full hash range; partitioned with N partitions → N special segments wrapping each persistent://...-partition-K with equal-width contiguous hash ranges. The metadata-store watch remains active so a later migration that writes scalable metadata to the same path is observed and transparently replaces the synthetic layout. * pushUpdate emits the canonical topic://... resolvedTopicName, plus the new special-segment marker when set. Tests: * DagWatchSessionTest covers synthetic layouts for both non-partitioned and partitioned regular topics, and verifies the wire response carries resolved_topic_name and underlying_topic_name. * CommandsScalableTopicTest verifies resolved_topic_name round-trips through the new Commands.newScalableTopicUpdate signature.
|
These are findings from a local Claude Code review. Please check before merging — they are suggestions, not blockers, and some may be intentional design choices. Findings
Non-issues / nits
|
lhotari
left a comment
There was a problem hiding this comment.
LGTM. Please check the local Claude Code review findings before merging.
| * @param sealedAtEpoch DAG epoch when sealed (-1 if still active) | ||
| * @param createdAtMs wall-clock millis at creation time | ||
| * @param sealedAtMs wall-clock millis at seal time (-1 if still active) | ||
| * <p>A segment may be a <i>special segment</i> — one that wraps an existing |
There was a problem hiding this comment.
could we call "special segment" something else? "special" can be vague and having a more meaningful concept would help in naming of other fields and methods. Perhaps "adaption segment", "adaptor segment", "adapted segment", "migration segment", "migrated segment", etc. to capture the meaning?
For underlyingTopicName, perhaps it could be "adaptedV4TopicName" ?
There was a problem hiding this comment.
Good call — renamed throughout. I went with "legacy segment" and legacyTopicName: a legacy segment is one that is not managed by the scalable-topic controller and has no segment://... URI of its own — it wraps an existing, externally managed persistent://... topic. The Javadoc spells out that externally-managed framing. Applied to the proto field (underlying_topic_name → legacy_topic_name), SegmentInfo (activeSpecial → activeLegacy, isSpecial → isLegacy), and the V5 SDK in the follow-up PR. Pushed in 74fab13.
…t lookup Rename + bug fixes from the PR apache#25822 review. Rename "special segment" → "legacy segment": * The concept was vague. A legacy segment is one not managed by the scalable-topic controller — it wraps an existing, externally managed persistent:// topic. Renamed the proto field (underlying_topic_name → legacy_topic_name), SegmentInfo (underlyingTopicName → legacyTopicName, activeSpecial → activeLegacy, isSpecial → isLegacy), and all comments. Bug fixes: * TopicName.toScalableTopic() now strips any -partition-K suffix, so persistent://t/n/x-partition-3 resolves to topic://t/n/x rather than topic://t/n/x-partition-3. * DagWatchSession.start() rejects an individual-partition lookup target up front (the synthetic layout models the whole partitioned topic; wrapping one partition would otherwise produce nonsensical -partition-K-partition-J names). * Synthetic partitioned layout no longer throws when partitions exceeds the 16-bit hash space (width would be 0 → HashRange invariant violation). Ranges are cosmetic for synthetic layouts (routing is mod-N over segment_id), so a degenerate clamped slot is used past 65536 partitions. * SegmentInfo.isLegacy() rejects an empty legacyTopicName, not just null. * Commands.newScalableTopicUpdate guards a null resolvedTopicName (the field is optional; the lightproto setter would otherwise NPE). Docs/tests: * Documented that a synthetic layout is built even for a non-existent topic — existence/auto-create is decided downstream by the namespace policy when the producer/consumer attaches, matching v4. * Documented that synthetic hash ranges are cosmetic (mod-N routing). * Added TopicNameTest.testToScalableTopic (topic://, persistent://, short-form, and -partition-K stripping — also exercises short-form input per the review). * Added DagWatchSessionTest.testStartRejectsIndividualPartitionInput. * Added CommandsScalableTopicTest coverage for the null-resolved-name guard. Confirmed non-issue: V1 names (with a cluster component) are rejected by TopicName.get() before toScalableTopic() can see them.
|
Thanks for the thorough review @lhotari. Addressed in 74fab13: Naming — renamed "special segment" → "legacy segment" ( Bugs
Quality / docs Confirmed non-issue Deferred |
Summary
First implementation PR for PIP-475: Regular-to-Scalable Topic Migration. Lays the broker-side foundation: the scalable-topic lookup session now accepts
persistent://and short-form topic names in addition totopic://, and synthesises a layout that wraps an existing regular (partitioned or non-partitioned) topic as one or more special segments. V5 clients can then operate against any topic through the scalable surface, even before it has been migrated — the SDK consumption of the synthetic layout will land in a follow-up PR.Wire-format additions
SegmentInfoProto.underlying_topic_name— when set, the segment wraps the namedpersistent://...topic instead of having its ownsegment://...URI.CommandScalableTopicUpdate.resolved_topic_name— the canonicaltopic://t/n/xidentity, set on every successful response so a client that looked up viapersistent://...or short-form learns the resolved name.Java surface
SegmentInfo.underlyingTopicName+activeSpecial(...)factory +isSpecial()check.TopicName.toScalableTopic()— derive the canonical scalable identity from a name in any input domain.Broker behaviour
ServerCnxrejectsnon-persistent://scalable-topic lookups early.DagWatchSession.start()falls back to a synthetic layout forpersistent://input when no scalable metadata exists at the canonical path:persistent://...-partition-Kwith equal-width contiguous hash ranges.pushUpdateemits the canonicaltopic://...resolvedTopicName, plus the new special-segment marker when set.Test plan
DagWatchSessionTestcovers synthetic layouts for both non-partitioned and partitioned regular topics, and verifies the wire response carriesresolved_topic_nameandunderlying_topic_name(17/17 passing).CommandsScalableTopicTestverifiesresolved_topic_nameround-trips through the newCommands.newScalableTopicUpdatesignature (8/8 passing).ScalableTopicControllerTestregression check (33/33 passing).