-
Notifications
You must be signed in to change notification settings - Fork 53
Adding maxBytesPerTrigger tag for Pulsar Admission Control #151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
38 commits
Select commit
Hold shift + click to select a range
66667d4
init
ericm-db 9d5ceef
correcting bytesLeftInLedger calculation
ericm-db 10b9444
adding check for startoffset
ericm-db 63ac0a9
adminUrl correction
ericm-db eb73093
only MessageId is null tests failing
ericm-db 6b858d7
adding pulsaroption
ericm-db 4347744
test case
ericm-db b491844
moving functionality to PulsarHelper
ericm-db 423efe4
feedback and refactoring
ericm-db 769890e
feedback
ericm-db dbadb7e
dealing with startLedgerId == ledger.id
ericm-db 54ea233
fix
ericm-db 843f7c9
check readLimit greater than 0
ericm-db 8f51130
early return if readLimitLeft == 0
ericm-db 0cf4dd7
increasing processing time
ericm-db fb645ec
removing unnecessary code
ericm-db f4a3b39
checking if consumer is connected + pulsaradmissionhelper
ericm-db be7d93c
putting latestOffsetForTopic in AdmissionControlHelper
ericm-db 9a4b1a4
added more tests for admission control
ericm-db b0a4450
changing where pulsarAdmin is set
ericm-db dbe6528
test where we add a new topic partition after stream has started
ericm-db b6a114c
fetchlatest -> gettopicpartitions
ericm-db 2e69b5f
more tests
ericm-db 4d00f88
changing AddPulsarDataWithPartition from Set(topic) -> topic
ericm-db 9369424
adding test case concurrent topic writes
ericm-db 3103b66
changing getAdminUrl and reducing offsets, startpartitionoffsets redu…
ericm-db 4712cf4
setting partition index in messageId correctly
ericm-db a3acee5
removing info logs
ericm-db bf3dfe6
removing maxEntriesPerLedger option
ericm-db 12deb6b
adding maxEntriesPerLedger in test
ericm-db b922024
maxEntriesPerLedger works
ericm-db eeb0b45
spacing
ericm-db c115f6f
checking numInputRows per microbatch in query
ericm-db 8c4a7e2
removing exact checklastbatch
ericm-db 6e7b93c
updating README
ericm-db c69d34b
updating readme
ericm-db 305fb2c
changing admin.url in readme
ericm-db 3fdc0b7
build errors
ericm-db File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,16 +22,22 @@ import scala.collection.mutable | |
| import scala.language.postfixOps | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.pulsar.client.admin.PulsarAdmin | ||
| import org.apache.pulsar.client.api.{MessageId, PulsarClient} | ||
| import org.apache.pulsar.client.impl.{MessageIdImpl, PulsarClientImpl} | ||
| import org.apache.pulsar.client.impl.schema.BytesSchema | ||
| import org.apache.pulsar.client.internal.DefaultImplementation | ||
| import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace | ||
| import org.apache.pulsar.common.naming.TopicName | ||
| import org.apache.pulsar.common.schema.SchemaInfo | ||
| import org.apache.pulsar.shade.com.google.common.util.concurrent.Uninterruptibles | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.connector.read.streaming | ||
| import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit} | ||
| import org.apache.spark.sql.pulsar.PulsarOptions._ | ||
| import org.apache.spark.sql.pulsar.PulsarSourceUtils.{getEntryId, getLedgerId} | ||
| import org.apache.spark.sql.pulsar.SpecificPulsarOffset.getTopicOffsets | ||
| import org.apache.spark.sql.types.StructType | ||
|
|
||
| /** | ||
|
|
@@ -40,6 +46,7 @@ import org.apache.spark.sql.types.StructType | |
| */ | ||
| private[pulsar] case class PulsarHelper( | ||
| serviceUrl: String, | ||
| adminUrl: Option[String], | ||
| clientConf: ju.Map[String, Object], | ||
| driverGroupIdPrefix: String, | ||
| caseInsensitiveParameters: Map[String, String], | ||
|
|
@@ -55,6 +62,12 @@ private[pulsar] case class PulsarHelper( | |
| private var topics: Seq[String] = _ | ||
| private var topicPartitions: Seq[String] = _ | ||
|
|
||
| // We can call adminUrl.get because admissionControlHelper | ||
| // will only be called if latestOffset is called and there should | ||
| // be an exception thrown in PulsarProvider if maxBytes is set, | ||
| // and adminUrl is not set | ||
| private lazy val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl.get) | ||
|
|
||
| override def close(): Unit = { | ||
| // do nothing | ||
| } | ||
|
|
@@ -122,7 +135,9 @@ private[pulsar] case class PulsarHelper( | |
| offset.foreach { case (tp, mid) => | ||
| try { | ||
| val (subscription, _) = extractSubscription(predefinedSubscription, tp) | ||
| CachedConsumer.getOrCreate(tp, subscription, client).seek(mid) | ||
| val consumer = CachedConsumer.getOrCreate(tp, subscription, client) | ||
| if (!consumer.isConnected) consumer.getLastMessageId | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know this is a bug that pulsar consumer do not attempt to reconnect when doing seek(), can you leave a comment here explaining why this change is needed and TODO that we will get rid of this once we upgraded to a version that has the fix? |
||
| consumer.seek(mid) | ||
| } catch { | ||
| case e: Throwable => | ||
| throw new RuntimeException( | ||
|
|
@@ -207,6 +222,35 @@ private[pulsar] case class PulsarHelper( | |
| }.toMap) | ||
| } | ||
|
|
||
| def latestOffsets(startingOffset: streaming.Offset, | ||
| totalReadLimit: Long): SpecificPulsarOffset = { | ||
| // implement helper inside PulsarHelper in order to use getTopicPartitions | ||
| val topicPartitions = getTopicPartitions | ||
| // add new partitions from PulsarAdmin, set to earliest entry and ledger id based on limit | ||
| // start a reader, get to the earliest offset for new topic partitions | ||
| val existingStartOffsets = if (startingOffset != null) { | ||
| getTopicOffsets(startingOffset.asInstanceOf[org.apache.spark.sql.execution.streaming.Offset]) | ||
| } else { | ||
| Map[String, MessageId]() | ||
| } | ||
| val newTopics = topicPartitions.toSet.diff(existingStartOffsets.keySet) | ||
| val startPartitionOffsets = existingStartOffsets ++ newTopics.map(topicPartition | ||
| => { | ||
| topicPartition -> MessageId.earliest | ||
| }) | ||
| val offsets = mutable.Map[String, MessageId]() | ||
| val numPartitions = startPartitionOffsets.size | ||
| // move all topic partition logic to helper function | ||
| val readLimit = totalReadLimit / numPartitions | ||
| startPartitionOffsets.keys.foreach { topicPartition => | ||
| val startMessageId = startPartitionOffsets.apply(topicPartition) | ||
| offsets += (topicPartition -> | ||
| admissionControlHelper.latestOffsetForTopicPartition( | ||
| topicPartition, startMessageId, readLimit)) | ||
| } | ||
| SpecificPulsarOffset(offsets.toMap) | ||
| } | ||
|
|
||
| def fetchLatestOffsetForTopic(topic: String): MessageId = { | ||
| val messageId = | ||
| try { | ||
|
|
@@ -472,3 +516,68 @@ private[pulsar] case class PulsarHelper( | |
| CachedConsumer.getOrCreate(topic, subscriptionName, client).getLastMessageId | ||
| } | ||
| } | ||
|
|
||
| class PulsarAdmissionControlHelper(adminUrl: String) | ||
| extends Logging { | ||
|
|
||
| private lazy val pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build() | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
||
| def latestOffsetForTopicPartition(topicPartition: String, | ||
| startMessageId: MessageId, | ||
| readLimit: Long): MessageId = { | ||
| val startLedgerId = getLedgerId(startMessageId) | ||
| val startEntryId = getEntryId(startMessageId) | ||
| val stats = pulsarAdmin.topics.getInternalStats(topicPartition) | ||
| val ledgers = pulsarAdmin.topics.getInternalStats(topicPartition).ledgers. | ||
| asScala.filter(_.ledgerId >= startLedgerId).sortBy(_.ledgerId) | ||
| // The last ledger of the ledgers list doesn't have .size or .entries | ||
| // properly populated, and the corresponding info is in currentLedgerSize | ||
| // and currentLedgerEntries | ||
| if (ledgers.nonEmpty) { | ||
| ledgers.last.size = stats.currentLedgerSize | ||
| ledgers.last.entries = stats.currentLedgerEntries | ||
| } | ||
| val partitionIndex = if (topicPartition.contains(PartitionSuffix)) { | ||
| topicPartition.split(PartitionSuffix)(1).toInt | ||
| } else { | ||
| -1 | ||
| } | ||
| var messageId = startMessageId | ||
| var readLimitLeft = readLimit | ||
| ledgers.filter(_.entries != 0).sortBy(_.ledgerId).foreach { ledger => | ||
| assert(readLimitLeft >= 0) | ||
| if (readLimitLeft == 0) { | ||
|
ericm-db marked this conversation as resolved.
|
||
| return messageId | ||
| } | ||
| val avgBytesPerEntries = ledger.size / ledger.entries | ||
| // approximation of bytes left in ledger to deal with case | ||
| // where we are at the middle of the ledger | ||
| val bytesLeftInLedger = if (ledger.ledgerId == startLedgerId) { | ||
| avgBytesPerEntries * (ledger.entries - startEntryId - 1) | ||
| } else { | ||
| ledger.size | ||
| } | ||
| if (readLimitLeft > bytesLeftInLedger) { | ||
| readLimitLeft -= bytesLeftInLedger | ||
| messageId = DefaultImplementation | ||
| .getDefaultImplementation | ||
| .newMessageId(ledger.ledgerId, ledger.entries - 1, partitionIndex) | ||
| } else { | ||
| val numEntriesToRead = Math.max(1, readLimitLeft / avgBytesPerEntries) | ||
| val lastEntryId = if (ledger.ledgerId != startLedgerId) { | ||
| numEntriesToRead - 1 | ||
| } else { | ||
| startEntryId + numEntriesToRead | ||
| } | ||
| val lastEntryRead = Math.min(ledger.entries - 1, lastEntryId) | ||
| messageId = DefaultImplementation | ||
| .getDefaultImplementation | ||
| .newMessageId(ledger.ledgerId, lastEntryRead, partitionIndex) | ||
| readLimitLeft = 0 | ||
| } | ||
| } | ||
| messageId | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.