Skip to content

Commit cd93f8d

Browse files
authored
Adding maxBytesPerTrigger tag for Pulsar Admission Control (#151)
* init * correcting bytesLeftInLedger calculation * adding check for startoffset * adminUrl correction * only MessageId is null tests failing * adding pulsaroption * test case * moving functionality to PulsarHelper * feedback and refactoring * feedback * dealing with startLedgerId == ledger.id * fix * check readLimit greater than 0 * early return if readLimitLeft == 0 * increasing processing time * removing unnecessary code * checking if consumer is connected + pulsaradmissionhelper * putting latestOffsetForTopic in AdmissionControlHelper * added more tests for admission control * changing where pulsarAdmin is set * test where we add a new topic partition after stream has started * fetchlatest -> gettopicpartitions * more tests * changing AddPulsarDataWithPartition from Set(topic) -> topic * adding test case concurrent topic writes * changing getAdminUrl and reducing offsets, startpartitionoffsets redundancy * setting partition index in messageId correctly * removing info logs * removing maxEntriesPerLedger option * adding maxEntriesPerLedger in test * maxEntriesPerLedger works * spacing * checking numInputRows per microbatch in query * removing exact checklastbatch * updating README * updating readme * changing admin.url in readme * build errors
1 parent f1b8d99 commit cd93f8d

9 files changed

Lines changed: 553 additions & 15 deletions

File tree

README.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,21 @@ A possible solution to remove duplicates when reading the written data could be
219219
</tr>
220220

221221
<tr>
222-
<td>`admin.url` (Deprecated)</td>
222+
<td>`admin.url`</td>
223223
<td>A service HTTP URL of your Pulsar cluster</td>
224224
<td>No</td>
225225
<td>None</td>
226226
<td>Streaming and Batch</td>
227-
<td>The Pulsar `serviceHttpUrl` configuration. </td>
227+
<td>The Pulsar `serviceHttpUrl` configuration. Only needed when `maxBytesPerTrigger` is specified</td>
228+
</tr>
229+
230+
<tr>
231+
<td>`maxBytesPerTrigger`</td>
232+
<td>A long value in unit of number of bytes</td>
233+
<td>No</td>
234+
<td>None</td>
235+
<td>Streaming and Batch</td>
236+
<td>A soft limit of the maximum number of bytes we want to process per microbatch. If this is specified, `admin.url` also needs to be specified.</td>
228237
</tr>
229238

230239
<tr>

src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,22 @@ import scala.collection.mutable
2222
import scala.language.postfixOps
2323
import scala.util.control.NonFatal
2424

25+
import org.apache.pulsar.client.admin.PulsarAdmin
2526
import org.apache.pulsar.client.api.{MessageId, PulsarClient}
2627
import org.apache.pulsar.client.impl.{MessageIdImpl, PulsarClientImpl}
2728
import org.apache.pulsar.client.impl.schema.BytesSchema
29+
import org.apache.pulsar.client.internal.DefaultImplementation
2830
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace
2931
import org.apache.pulsar.common.naming.TopicName
3032
import org.apache.pulsar.common.schema.SchemaInfo
3133
import org.apache.pulsar.shade.com.google.common.util.concurrent.Uninterruptibles
3234

3335
import org.apache.spark.internal.Logging
36+
import org.apache.spark.sql.connector.read.streaming
37+
import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit}
3438
import org.apache.spark.sql.pulsar.PulsarOptions._
39+
import org.apache.spark.sql.pulsar.PulsarSourceUtils.{getEntryId, getLedgerId}
40+
import org.apache.spark.sql.pulsar.SpecificPulsarOffset.getTopicOffsets
3541
import org.apache.spark.sql.types.StructType
3642

3743
/**
@@ -40,6 +46,7 @@ import org.apache.spark.sql.types.StructType
4046
*/
4147
private[pulsar] case class PulsarHelper(
4248
serviceUrl: String,
49+
adminUrl: Option[String],
4350
clientConf: ju.Map[String, Object],
4451
driverGroupIdPrefix: String,
4552
caseInsensitiveParameters: Map[String, String],
@@ -55,6 +62,12 @@ private[pulsar] case class PulsarHelper(
5562
private var topics: Seq[String] = _
5663
private var topicPartitions: Seq[String] = _
5764

65+
// We can call adminUrl.get because admissionControlHelper
66+
// will only be called if latestOffset is called and there should
67+
// be an exception thrown in PulsarProvider if maxBytes is set,
68+
// and adminUrl is not set
69+
private lazy val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl.get)
70+
5871
override def close(): Unit = {
5972
// do nothing
6073
}
@@ -122,7 +135,9 @@ private[pulsar] case class PulsarHelper(
122135
offset.foreach { case (tp, mid) =>
123136
try {
124137
val (subscription, _) = extractSubscription(predefinedSubscription, tp)
125-
CachedConsumer.getOrCreate(tp, subscription, client).seek(mid)
138+
val consumer = CachedConsumer.getOrCreate(tp, subscription, client)
139+
if (!consumer.isConnected) consumer.getLastMessageId
140+
consumer.seek(mid)
126141
} catch {
127142
case e: Throwable =>
128143
throw new RuntimeException(
@@ -207,6 +222,35 @@ private[pulsar] case class PulsarHelper(
207222
}.toMap)
208223
}
209224

225+
def latestOffsets(startingOffset: streaming.Offset,
226+
totalReadLimit: Long): SpecificPulsarOffset = {
227+
// implement helper inside PulsarHelper in order to use getTopicPartitions
228+
val topicPartitions = getTopicPartitions
229+
// add new partitions from PulsarAdmin, set to earliest entry and ledger id based on limit
230+
// start a reader, get to the earliest offset for new topic partitions
231+
val existingStartOffsets = if (startingOffset != null) {
232+
getTopicOffsets(startingOffset.asInstanceOf[org.apache.spark.sql.execution.streaming.Offset])
233+
} else {
234+
Map[String, MessageId]()
235+
}
236+
val newTopics = topicPartitions.toSet.diff(existingStartOffsets.keySet)
237+
val startPartitionOffsets = existingStartOffsets ++ newTopics.map(topicPartition
238+
=> {
239+
topicPartition -> MessageId.earliest
240+
})
241+
val offsets = mutable.Map[String, MessageId]()
242+
val numPartitions = startPartitionOffsets.size
243+
// move all topic partition logic to helper function
244+
val readLimit = totalReadLimit / numPartitions
245+
startPartitionOffsets.keys.foreach { topicPartition =>
246+
val startMessageId = startPartitionOffsets.apply(topicPartition)
247+
offsets += (topicPartition ->
248+
admissionControlHelper.latestOffsetForTopicPartition(
249+
topicPartition, startMessageId, readLimit))
250+
}
251+
SpecificPulsarOffset(offsets.toMap)
252+
}
253+
210254
def fetchLatestOffsetForTopic(topic: String): MessageId = {
211255
val messageId =
212256
try {
@@ -472,3 +516,68 @@ private[pulsar] case class PulsarHelper(
472516
CachedConsumer.getOrCreate(topic, subscriptionName, client).getLastMessageId
473517
}
474518
}
519+
520+
class PulsarAdmissionControlHelper(adminUrl: String)
521+
extends Logging {
522+
523+
private lazy val pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()
524+
525+
import scala.collection.JavaConverters._
526+
527+
def latestOffsetForTopicPartition(topicPartition: String,
528+
startMessageId: MessageId,
529+
readLimit: Long): MessageId = {
530+
val startLedgerId = getLedgerId(startMessageId)
531+
val startEntryId = getEntryId(startMessageId)
532+
val stats = pulsarAdmin.topics.getInternalStats(topicPartition)
533+
val ledgers = pulsarAdmin.topics.getInternalStats(topicPartition).ledgers.
534+
asScala.filter(_.ledgerId >= startLedgerId).sortBy(_.ledgerId)
535+
// The last ledger of the ledgers list doesn't have .size or .entries
536+
// properly populated, and the corresponding info is in currentLedgerSize
537+
// and currentLedgerEntries
538+
if (ledgers.nonEmpty) {
539+
ledgers.last.size = stats.currentLedgerSize
540+
ledgers.last.entries = stats.currentLedgerEntries
541+
}
542+
val partitionIndex = if (topicPartition.contains(PartitionSuffix)) {
543+
topicPartition.split(PartitionSuffix)(1).toInt
544+
} else {
545+
-1
546+
}
547+
var messageId = startMessageId
548+
var readLimitLeft = readLimit
549+
ledgers.filter(_.entries != 0).sortBy(_.ledgerId).foreach { ledger =>
550+
assert(readLimitLeft >= 0)
551+
if (readLimitLeft == 0) {
552+
return messageId
553+
}
554+
val avgBytesPerEntries = ledger.size / ledger.entries
555+
// approximation of bytes left in ledger to deal with case
556+
// where we are at the middle of the ledger
557+
val bytesLeftInLedger = if (ledger.ledgerId == startLedgerId) {
558+
avgBytesPerEntries * (ledger.entries - startEntryId - 1)
559+
} else {
560+
ledger.size
561+
}
562+
if (readLimitLeft > bytesLeftInLedger) {
563+
readLimitLeft -= bytesLeftInLedger
564+
messageId = DefaultImplementation
565+
.getDefaultImplementation
566+
.newMessageId(ledger.ledgerId, ledger.entries - 1, partitionIndex)
567+
} else {
568+
val numEntriesToRead = Math.max(1, readLimitLeft / avgBytesPerEntries)
569+
val lastEntryId = if (ledger.ledgerId != startLedgerId) {
570+
numEntriesToRead - 1
571+
} else {
572+
startEntryId + numEntriesToRead
573+
}
574+
val lastEntryRead = Math.min(ledger.entries - 1, lastEntryId)
575+
messageId = DefaultImplementation
576+
.getDefaultImplementation
577+
.newMessageId(ledger.ledgerId, lastEntryRead, partitionIndex)
578+
readLimitLeft = 0
579+
}
580+
}
581+
messageId
582+
}
583+
}

src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ private[pulsar] object PulsarOptions {
3636
val TopicOptionKeys: Set[String] = Set(TopicSingle, TopicMulti, TopicPattern)
3737

3838
val ServiceUrlOptionKey: String = "service.url"
39+
val AdminUrlOptionKey: String = "admin.url"
3940
val StartingOffsetsOptionKey: String = "startingOffsets".toLowerCase(Locale.ROOT)
4041
val StartingTime: String = "startingTime".toLowerCase(Locale.ROOT)
4142
val EndingTime: String = "endingTime".toLowerCase(Locale.ROOT)
@@ -45,6 +46,7 @@ private[pulsar] object PulsarOptions {
4546
val SubscriptionPrefix: String = "subscriptionPrefix".toLowerCase(Locale.ROOT)
4647
val PredefinedSubscription: String = "predefinedSubscription".toLowerCase(Locale.ROOT)
4748

49+
val MaxBytesPerTrigger: String = "maxBytesPerTrigger".toLowerCase(Locale.ROOT)
4850
val PollTimeoutMS: String = "pollTimeoutMs".toLowerCase(Locale.ROOT)
4951
val FailOnDataLossOptionKey: String = "failOnDataLoss".toLowerCase(Locale.ROOT)
5052

src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,13 @@ private[pulsar] class PulsarProvider
5656
parameters: Map[String, String]): (String, StructType) = {
5757

5858
val caseInsensitiveParams = validateStreamOptions(parameters)
59-
val (clientConfig, _, serviceUrlConfig) = prepareConfForReader(parameters)
59+
val (clientConfig, _, serviceUrlConfig, adminUrl) = prepareConfForReader(parameters)
6060

6161
val subscriptionNamePrefix = s"spark-pulsar-${UUID.randomUUID}"
6262
val inferredSchema = Utils.tryWithResource(
6363
PulsarHelper(
6464
serviceUrlConfig,
65+
adminUrl,
6566
clientConfig,
6667
subscriptionNamePrefix,
6768
caseInsensitiveParams,
@@ -84,13 +85,14 @@ private[pulsar] class PulsarProvider
8485
logDebug(s"Creating Pulsar source: $parameters")
8586

8687
val caseInsensitiveParams = validateStreamOptions(parameters)
87-
val (clientConfig, readerConfig, serviceUrl) = prepareConfForReader(parameters)
88+
val (clientConfig, readerConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)
8889
logDebug(
8990
s"Client config: $clientConfig; Reader config: $readerConfig; Service URL: $serviceUrl")
9091

9192
val subscriptionNamePrefix = getSubscriptionPrefix(parameters)
9293
val pulsarHelper = PulsarHelper(
9394
serviceUrl,
95+
adminUrl,
9496
clientConfig,
9597
subscriptionNamePrefix,
9698
caseInsensitiveParams,
@@ -105,6 +107,12 @@ private[pulsar] class PulsarProvider
105107
pulsarHelper.offsetForEachTopic(caseInsensitiveParams, LatestOffset, StartOptionKey)
106108
pulsarHelper.setupCursor(offset)
107109

110+
val maxBytes = maxBytesPerTrigger(caseInsensitiveParams)
111+
if (adminUrl.isEmpty && maxBytes != 0L) {
112+
throw new IllegalArgumentException("admin.url " +
113+
"must be specified if maxBytesPerTrigger is specified")
114+
}
115+
108116
new PulsarSource(
109117
sqlContext,
110118
pulsarHelper,
@@ -113,6 +121,7 @@ private[pulsar] class PulsarProvider
113121
metadataPath,
114122
offset,
115123
pollTimeoutMs(caseInsensitiveParams),
124+
maxBytesPerTrigger(caseInsensitiveParams),
116125
failOnDataLoss(caseInsensitiveParams),
117126
subscriptionNamePrefix,
118127
jsonOptions)
@@ -125,10 +134,11 @@ private[pulsar] class PulsarProvider
125134

126135
val subscriptionNamePrefix = getSubscriptionPrefix(parameters, isBatch = true)
127136

128-
val (clientConfig, readerConfig, serviceUrl) = prepareConfForReader(parameters)
137+
val (clientConfig, readerConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)
129138
val (start, end, schema, pSchema) = Utils.tryWithResource(
130139
PulsarHelper(
131140
serviceUrl,
141+
adminUrl,
132142
clientConfig,
133143
subscriptionNamePrefix,
134144
caseInsensitiveParams,
@@ -366,6 +376,10 @@ private[pulsar] object PulsarProvider extends Logging {
366376
parameters(ServiceUrlOptionKey)
367377
}
368378

379+
private def getAdminUrl(parameters: Map[String, String]): Option[String] = {
380+
parameters.get(AdminUrlOptionKey)
381+
}
382+
369383
private def getAllowDifferentTopicSchemas(parameters: Map[String, String]): Boolean = {
370384
parameters.getOrElse(AllowDifferentTopicSchemas, "false").toBoolean
371385
}
@@ -380,6 +394,13 @@ private[pulsar] object PulsarProvider extends Logging {
380394
(SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000).toString)
381395
.toInt
382396

397+
private def maxBytesPerTrigger(caseInsensitiveParams: Map[String, String]): Long =
398+
caseInsensitiveParams
399+
.getOrElse(
400+
PulsarOptions.MaxBytesPerTrigger,
401+
0L.toString
402+
).toLong
403+
383404
private def validateGeneralOptions(
384405
caseInsensitiveParams: Map[String, String]): Map[String, String] = {
385406
if (!caseInsensitiveParams.contains(ServiceUrlOptionKey)) {
@@ -486,17 +507,18 @@ private[pulsar] object PulsarProvider extends Logging {
486507
}
487508

488509
private def prepareConfForReader(parameters: Map[String, String])
489-
: (ju.Map[String, Object], ju.Map[String, Object], String) = {
510+
: (ju.Map[String, Object], ju.Map[String, Object], String, Option[String]) = {
490511

491512
val serviceUrl = getServiceUrl(parameters)
513+
val adminUrl = getAdminUrl(parameters)
492514
var clientParams = getClientParams(parameters)
493515
clientParams += (ServiceUrlOptionKey -> serviceUrl)
494516
val readerParams = getReaderParams(parameters)
495517

496518
(
497519
paramsToPulsarConf("pulsar.client", clientParams),
498520
paramsToPulsarConf("pulsar.reader", readerParams),
499-
serviceUrl)
521+
serviceUrl, adminUrl)
500522
}
501523

502524
private def prepareConfForProducer(parameters: Map[String, String])

0 commit comments

Comments
 (0)