Add parition reader to solve the Reader fragmented read data.#696
Add parition reader to solve the Reader fragmented read data.#696EAHITechnology wants to merge 6 commits into
Conversation
Add parition reader to solve the Reader fragmented read data.
|
@EAHITechnology Nick work, can you add test case for this feature? |
| } | ||
|
|
||
| return r.pc.Seek(mid) | ||
| return r.consumers[mid.partitionIdx].Seek(mid) |
There was a problem hiding this comment.
could there be an invalid partition (out of range) index here?
There was a problem hiding this comment.
why?The number of Consumers is determined by the TopicPartitions() function. If there is a problem here, then the Patition_Consumer is also problematic.
| "time" | ||
| ) | ||
|
|
||
| func (r *reader) internalTopicReadToPartitions() error { |
There was a problem hiding this comment.
What is this function doing I'm a little confused by the name. Also, this looks like a lot of duplication code from the consumer partitions is there any way to refactor/combine in to helper functions to avoid duplication?
There was a problem hiding this comment.
According to issues, we need to subscribe to partition_consumer separately. The meaning of this function is to convert topic into partition_topic for subscription
There was a problem hiding this comment.
What's more difficult is that reader requires a lot of specific parameters. After thinking about it, I decided to encapsulate directly. If not, this will swell the member variables of the reader.
There was a problem hiding this comment.
For example, the Reader will process startMessageID, but the consumer will not.
| return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.consumers[idx].startMessageID.messageID) | ||
| } | ||
| for idx := range r.consumers { | ||
| if moreMessagesCheck(idx) { |
There was a problem hiding this comment.
Let's pass the consumer and the lastMessageInBroker (instead of an index) consumerHasMoreMessages(consumer Consumer, lastMessageInBroker) Then the logic can be Tested.
There was a problem hiding this comment.
Done.It sounds pretty good
wolfstudy
left a comment
There was a problem hiding this comment.
Thanks @EAHITechnology work for this, nice work just a little comments, please check
| @@ -0,0 +1,142 @@ | |||
| package pulsar | |||
There was a problem hiding this comment.
Please add license header for new file?
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
|
|
||
| subscriptionName := r.options.SubscriptionRolePrefix | ||
| if subscriptionName == "" { | ||
| subscriptionName = "reader" |
There was a problem hiding this comment.
In here, maybe we can define a string constant
const ReaderSubNamePrefix = "reader"
| metadata: r.options.Properties, | ||
| replicateSubscriptionState: false, | ||
| startMessageID: startMessageID, | ||
| subscriptionMode: durable, |
There was a problem hiding this comment.
The reader use subscriptionMode is nonDurable?
There was a problem hiding this comment.
It seems that we have lost the attribute of the startMessageIDInclusive field?
|
@EAHITechnology Please add unit test case for this change? We can use the following code to create a topic with 3 partitions |
list: