-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][client] PIP-229: Add a common interface to get fields of MessageIdData #19414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
BewareMyPower
merged 2 commits into
apache:master
from
BewareMyPower:bewaremypower/pip-229-msg-id-adv
Apr 4, 2023
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
142 changes: 142 additions & 0 deletions
142
pulsar-broker/src/test/java/org/apache/pulsar/client/api/CustomMessageIdTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,142 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pulsar.client.api; | ||
|
|
||
| import static org.testng.Assert.assertEquals; | ||
| import static org.testng.Assert.assertFalse; | ||
| import static org.testng.Assert.assertNotNull; | ||
| import static org.testng.Assert.assertTrue; | ||
| import java.util.ArrayList; | ||
| import java.util.concurrent.TimeUnit; | ||
| import lombok.Cleanup; | ||
| import org.testng.annotations.AfterClass; | ||
| import org.testng.annotations.BeforeClass; | ||
| import org.testng.annotations.DataProvider; | ||
| import org.testng.annotations.Test; | ||
|
|
||
| @Test(groups = "broker-api") | ||
| public class CustomMessageIdTest extends ProducerConsumerBase { | ||
|
|
||
| @BeforeClass | ||
| @Override | ||
| protected void setup() throws Exception { | ||
| super.internalSetup(); | ||
| super.producerBaseSetup(); | ||
| } | ||
|
|
||
| @AfterClass(alwaysRun = true) | ||
| @Override | ||
| protected void cleanup() throws Exception { | ||
| super.internalCleanup(); | ||
| } | ||
|
|
||
| @DataProvider | ||
| public static Object[][] enableBatching() { | ||
| return new Object[][]{ | ||
| { true }, | ||
| { false } | ||
| }; | ||
| } | ||
|
|
||
| @Test | ||
| public void testSeek() throws Exception { | ||
| final var topic = "persistent://my-property/my-ns/test-seek-" + System.currentTimeMillis(); | ||
| @Cleanup final var producer = pulsarClient.newProducer(Schema.INT32).topic(topic).create(); | ||
| final var msgIds = new ArrayList<SimpleMessageIdImpl>(); | ||
| for (int i = 0; i < 10; i++) { | ||
| msgIds.add(new SimpleMessageIdImpl((MessageIdAdv) producer.send(i))); | ||
| } | ||
| @Cleanup final var consumer = pulsarClient.newConsumer(Schema.INT32) | ||
| .topic(topic).subscriptionName("sub").subscribe(); | ||
| consumer.seek(msgIds.get(6)); | ||
| final var msg = consumer.receive(3, TimeUnit.SECONDS); | ||
| assertNotNull(msg); | ||
| assertEquals(msg.getValue(), 7); | ||
| } | ||
|
|
||
| @Test(dataProvider = "enableBatching") | ||
| public void testAcknowledgment(boolean enableBatching) throws Exception { | ||
| final var topic = "persistent://my-property/my-ns/test-ack-" | ||
| + enableBatching + System.currentTimeMillis(); | ||
| final var producer = pulsarClient.newProducer(Schema.INT32) | ||
| .topic(topic) | ||
| .enableBatching(enableBatching) | ||
| .batchingMaxMessages(10) | ||
| .batchingMaxPublishDelay(300, TimeUnit.MILLISECONDS) | ||
| .create(); | ||
| final var consumer = pulsarClient.newConsumer(Schema.INT32) | ||
| .topic(topic) | ||
| .subscriptionName("sub") | ||
| .enableBatchIndexAcknowledgment(true) | ||
| .isAckReceiptEnabled(true) | ||
| .subscribe(); | ||
| for (int i = 0; i < 10; i++) { | ||
| producer.sendAsync(i); | ||
| } | ||
| final var msgIds = new ArrayList<SimpleMessageIdImpl>(); | ||
| for (int i = 0; i < 10; i++) { | ||
| final var msg = consumer.receive(); | ||
| final var msgId = new SimpleMessageIdImpl((MessageIdAdv) msg.getMessageId()); | ||
| msgIds.add(msgId); | ||
| if (enableBatching) { | ||
| assertTrue(msgId.getBatchIndex() >= 0 && msgId.getBatchSize() > 0); | ||
| } else { | ||
| assertFalse(msgId.getBatchIndex() >= 0 && msgId.getBatchSize() > 0); | ||
| } | ||
| } | ||
| consumer.acknowledgeCumulative(msgIds.get(8)); | ||
| consumer.redeliverUnacknowledgedMessages(); | ||
| final var msg = consumer.receive(3, TimeUnit.SECONDS); | ||
| assertNotNull(msg); | ||
| assertEquals(msg.getValue(), 9); | ||
| } | ||
|
|
||
| private record SimpleMessageIdImpl(long ledgerId, long entryId, int batchIndex, int batchSize) | ||
| implements MessageIdAdv { | ||
|
|
||
| public SimpleMessageIdImpl(MessageIdAdv msgId) { | ||
| this(msgId.getLedgerId(), msgId.getEntryId(), msgId.getBatchIndex(), msgId.getBatchSize()); | ||
| } | ||
|
|
||
| @Override | ||
| public byte[] toByteArray() { | ||
| return new byte[0]; // never used | ||
| } | ||
|
|
||
| @Override | ||
| public long getLedgerId() { | ||
| return ledgerId; | ||
| } | ||
|
|
||
| @Override | ||
| public long getEntryId() { | ||
| return entryId; | ||
| } | ||
|
|
||
| @Override | ||
| public int getBatchIndex() { | ||
| return batchIndex; | ||
| } | ||
|
|
||
| @Override | ||
| public int getBatchSize() { | ||
| return batchSize; | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
122 changes: 122 additions & 0 deletions
122
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageIdAdv.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.pulsar.client.api; | ||
|
|
||
| import java.util.BitSet; | ||
|
|
||
| /** | ||
| * The {@link MessageId} interface provided for advanced users. | ||
| * <p> | ||
| * All built-in MessageId implementations should be able to be cast to MessageIdAdv. | ||
| * </p> | ||
| */ | ||
| public interface MessageIdAdv extends MessageId { | ||
|
|
||
| /** | ||
| * Get the ledger ID. | ||
| * | ||
| * @return the ledger ID | ||
| */ | ||
| long getLedgerId(); | ||
|
|
||
| /** | ||
| * Get the entry ID. | ||
| * | ||
| * @return the entry ID | ||
| */ | ||
| long getEntryId(); | ||
|
|
||
| /** | ||
| * Get the partition index. | ||
| * | ||
| * @return -1 if the message is from a non-partitioned topic, otherwise the non-negative partition index | ||
| */ | ||
| default int getPartitionIndex() { | ||
| return -1; | ||
| } | ||
|
|
||
| /** | ||
| * Get the batch index. | ||
| * | ||
| * @return -1 if the message is not in a batch | ||
| */ | ||
| default int getBatchIndex() { | ||
| return -1; | ||
| } | ||
|
|
||
| /** | ||
| * Get the batch size. | ||
| * | ||
| * @return 0 if the message is not in a batch | ||
| */ | ||
| default int getBatchSize() { | ||
| return 0; | ||
| } | ||
|
|
||
| /** | ||
| * Get the BitSet that indicates which messages in the batch. | ||
| * | ||
| * @implNote The message IDs of a batch should share a BitSet. For example, given 3 messages in the same batch whose | ||
| * size is 3, all message IDs of them should return "111" (i.e. a BitSet whose size is 3 and all bits are 1). If the | ||
| * 1st message has been acknowledged, the returned BitSet should become "011" (i.e. the 1st bit become 0). | ||
| * | ||
| * @return null if the message is a non-batched message | ||
| */ | ||
| default BitSet getAckSet() { | ||
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * Get the message ID of the first chunk if the current message ID represents the position of a chunked message. | ||
| * | ||
| * @implNote A chunked message is distributed across different BookKeeper entries. The message ID of a chunked | ||
| * message is composed of two message IDs that represent positions of the first and the last chunk. The message ID | ||
| * itself represents the position of the last chunk. | ||
| * | ||
| * @return null if the message is not a chunked message | ||
| */ | ||
| default MessageIdAdv getFirstChunkMessageId() { | ||
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * The default implementation of {@link Comparable#compareTo(Object)}. | ||
| */ | ||
| default int compareTo(MessageId o) { | ||
| if (!(o instanceof MessageIdAdv)) { | ||
| throw new UnsupportedOperationException("Unknown MessageId type: " | ||
| + ((o != null) ? o.getClass().getName() : "null")); | ||
| } | ||
| final MessageIdAdv other = (MessageIdAdv) o; | ||
| int result = Long.compare(this.getLedgerId(), other.getLedgerId()); | ||
| if (result != 0) { | ||
| return result; | ||
| } | ||
| result = Long.compare(this.getEntryId(), other.getEntryId()); | ||
| if (result != 0) { | ||
| return result; | ||
| } | ||
| // TODO: Correct the following compare logics, see https://github.com/apache/pulsar/pull/18981 | ||
| result = Integer.compare(this.getPartitionIndex(), other.getPartitionIndex()); | ||
| if (result != 0) { | ||
| return result; | ||
| } | ||
| return Integer.compare(this.getBatchIndex(), other.getBatchIndex()); | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh.. this is wrong. we should not expose
ledgerID/entryIDin any public interface. we are opening a can of worms and we had prevented many times in Pulsar community. we can't check all the times for all the PRs but this PR must be reverted back to avoid exposing any internal details of Pulsar. It will be difficult in future to support this API if storage layer will not be bookie or bookie API will be changed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This proposal has been discussed and modified into the current version of this PR in https://lists.apache.org/thread/25rzflmkfmvxhf3my0ombnbpv7bvgy32 and received 3 binding +1s and 2 non-binding +1s in https://lists.apache.org/thread/kmjq6lf1f11mf6qb8onhnlr17n27fcv4.
The PIP is here: #18950
As I've explained in the PIP:
Currently
MessageIdis still exposed to users, which does not expose these internal details. The newMessageIdAdvinterface is only used for covenience. Users should write the following code after this PR:Before this PR, there is much code like:
You can also see these code references in the changes of this PR. If you're going to revert this PIP, please at least start the discussion in the mail list to hear more voices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should not let users use
ledgerId/entryId. what if we replace bookkeeper with other some other storage? in that case, such codebase will be broken and exposing internal data and encouraging users to use it not a good design decision. I didn't review this PIP before but such discussion happened multiple times in past and Pulsar community had rejected this proposal in past. As we have not done any release yet, so, it's better to revert this PR soon.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an ideal assumption. In real world, there are a lot of code references that use ledgerId/entryId from the
MessageId. They have to hack into thepulsar-clientmodule and castMessageIdto a specific implementation.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When users need to access the detailed fields, they should have assumed the storage is BookKeeper. Otherwise, there is no need to get the storage details from the
MessageId.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact is that Pulsar provides a hard-to-use interface
MessageId. Then users, including ecosystem developers, have to dig into the implementation details. I don't like saying that these developers use Pulsar in a wrong way. I'd like to say they are limited by the poor interface that Pulsar provided. Keeping the limitation is harmful to the Pulsar community and could make ecosystem applications flaky and easy to break.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point is, now it's open sourced and many more external applications need such ability to access these internal fields. When Pulsar was not open sourced, the use cases were limited. But things are different now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have much time to keep debating on this. And it already spent me too much time to clarify the motivation and find the code examples.
Let's use dev@apache.org to make decisions and hear more voices.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think Pulsar has poor interface or has any limitation. MessageId is just a reference from Pulsar and it should have correct serialization and deserialization methods. I don't know any system which encourages or provide a contract to extract messageId and depend on internal.
So, I don't think it's fair to say that Pulsar has limitations and it has poor APIs or interfaces. Abstraction is an important part of API contract and that's what Pulsar is doing. why keeping MessageID abstract from user can break application? and that's the exact reason why user application should not depend on such abstraction and should not try to hack that abstraction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not about usecases but it's about learning to do the right things.