Skip to content

Commit e7e87b6

Browse files
committed
moving functionality to PulsarHelper
1 parent 25c39eb commit e7e87b6

3 files changed

Lines changed: 85 additions & 70 deletions

File tree

src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,22 @@ import scala.collection.mutable
2222
import scala.language.postfixOps
2323
import scala.util.control.NonFatal
2424

25+
import org.apache.pulsar.client.admin.PulsarAdmin
2526
import org.apache.pulsar.client.api.{MessageId, PulsarClient}
2627
import org.apache.pulsar.client.impl.{MessageIdImpl, PulsarClientImpl}
2728
import org.apache.pulsar.client.impl.schema.BytesSchema
29+
import org.apache.pulsar.client.internal.DefaultImplementation
2830
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace
2931
import org.apache.pulsar.common.naming.TopicName
3032
import org.apache.pulsar.common.schema.SchemaInfo
3133
import org.apache.pulsar.shade.com.google.common.util.concurrent.Uninterruptibles
3234

3335
import org.apache.spark.internal.Logging
36+
import org.apache.spark.sql.connector.read.streaming
37+
import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit}
3438
import org.apache.spark.sql.pulsar.PulsarOptions._
39+
import org.apache.spark.sql.pulsar.PulsarSourceUtils.{getEntryId, getLedgerId}
40+
import org.apache.spark.sql.pulsar.SpecificPulsarOffset.getTopicOffsets
3541
import org.apache.spark.sql.types.StructType
3642

3743
/**
@@ -40,6 +46,7 @@ import org.apache.spark.sql.types.StructType
4046
*/
4147
private[pulsar] case class PulsarHelper(
4248
serviceUrl: String,
49+
adminUrl: String,
4350
clientConf: ju.Map[String, Object],
4451
driverGroupIdPrefix: String,
4552
caseInsensitiveParameters: Map[String, String],
@@ -55,6 +62,8 @@ private[pulsar] case class PulsarHelper(
5562
private var topics: Seq[String] = _
5663
private var topicPartitions: Seq[String] = _
5764

65+
private lazy val pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()
66+
5867
override def close(): Unit = {
5968
// do nothing
6069
}
@@ -207,6 +216,63 @@ private[pulsar] case class PulsarHelper(
207216
}.toMap)
208217
}
209218

219+
def latestOffsets(startingOffset: streaming.Offset,
220+
admissionLimits: AdmissionLimits): SpecificPulsarOffset = {
221+
// implement helper inside PulsarHelper in order to use getTopicPartitions
222+
val latestOffsets = fetchLatestOffsets().topicOffsets
223+
// add new partitions from PulsarAdmin, set to earliest entry and ledger id based on limit
224+
// start a reader, get to the earliest offset for new topic partitions
225+
val existingStartOffsets = if (startingOffset != null) {
226+
getTopicOffsets(startingOffset.asInstanceOf[org.apache.spark.sql.execution.streaming.Offset])
227+
} else {
228+
Map[String, MessageId]()
229+
}
230+
val newTopics = latestOffsets.keySet.diff(existingStartOffsets.keySet)
231+
val startPartitionOffsets = existingStartOffsets ++ newTopics.map(topicPartition
232+
=> topicPartition -> fetchLatestOffsetForTopic(topicPartition))
233+
val totalReadLimit = admissionLimits.bytesToTake
234+
val offsets = mutable.Map[String, MessageId]()
235+
offsets ++= startPartitionOffsets
236+
val numPartitions = startPartitionOffsets.size
237+
startPartitionOffsets.keys.filter(topicPartition => {
238+
pulsarAdmin.topics.getInternalStats(topicPartition).currentLedgerEntries > 0
239+
}).foreach { topicPartition =>
240+
var readLimit = totalReadLimit / numPartitions
241+
val messageId = startPartitionOffsets.apply(topicPartition)
242+
val ledgerId = getLedgerId(messageId)
243+
val entryId = getEntryId(messageId)
244+
val stats = pulsarAdmin.topics.getInternalStats(topicPartition)
245+
pulsarAdmin.topics.getInternalStats(topicPartition).ledgers.
246+
asScala.filter(_.ledgerId >= ledgerId).sortBy(_.ledgerId).foreach { ledger =>
247+
ledger.entries = stats.currentLedgerEntries
248+
val avgBytesPerEntries = stats.currentLedgerSize / stats.currentLedgerEntries
249+
// approximation of bytes left in ledger to deal with case
250+
// where we are at the middle of the ledger
251+
val bytesLeftInLedger = avgBytesPerEntries * {
252+
if (ledger.ledgerId == ledgerId) {
253+
ledger.entries - entryId
254+
} else {
255+
ledger.entries
256+
}
257+
}
258+
if (readLimit > bytesLeftInLedger) {
259+
readLimit -= bytesLeftInLedger
260+
offsets += (topicPartition -> DefaultImplementation
261+
.getDefaultImplementation
262+
.newMessageId(ledger.ledgerId, ledger.entries - 1, -1))
263+
} else {
264+
val numEntriesToRead = Math.max(1, readLimit / avgBytesPerEntries)
265+
val lastEntryRead = Math.min(ledger.entries - 1, entryId + numEntriesToRead)
266+
offsets += (topicPartition -> DefaultImplementation
267+
.getDefaultImplementation
268+
.newMessageId(ledger.ledgerId, lastEntryRead, -1))
269+
readLimit = 0
270+
}
271+
}
272+
}
273+
SpecificPulsarOffset(offsets.toMap)
274+
}
275+
210276
def fetchLatestOffsetForTopic(topic: String): MessageId = {
211277
val messageId =
212278
try {

src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,13 @@ private[pulsar] class PulsarProvider
5656
parameters: Map[String, String]): (String, StructType) = {
5757

5858
val caseInsensitiveParams = validateStreamOptions(parameters)
59-
val (clientConfig, _, serviceUrlConfig, _) = prepareConfForReader(parameters)
59+
val (clientConfig, _, serviceUrlConfig, adminUrl) = prepareConfForReader(parameters)
6060

6161
val subscriptionNamePrefix = s"spark-pulsar-${UUID.randomUUID}"
6262
val inferredSchema = Utils.tryWithResource(
6363
PulsarHelper(
6464
serviceUrlConfig,
65+
adminUrl,
6566
clientConfig,
6667
subscriptionNamePrefix,
6768
caseInsensitiveParams,
@@ -91,6 +92,7 @@ private[pulsar] class PulsarProvider
9192
val subscriptionNamePrefix = getSubscriptionPrefix(parameters)
9293
val pulsarHelper = PulsarHelper(
9394
serviceUrl,
95+
adminUrl,
9496
clientConfig,
9597
subscriptionNamePrefix,
9698
caseInsensitiveParams,
@@ -127,10 +129,11 @@ private[pulsar] class PulsarProvider
127129

128130
val subscriptionNamePrefix = getSubscriptionPrefix(parameters, isBatch = true)
129131

130-
val (clientConfig, readerConfig, serviceUrl, _) = prepareConfForReader(parameters)
132+
val (clientConfig, readerConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)
131133
val (start, end, schema, pSchema) = Utils.tryWithResource(
132134
PulsarHelper(
133135
serviceUrl,
136+
adminUrl,
134137
clientConfig,
135138
subscriptionNamePrefix,
136139
caseInsensitiveParams,

src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala

Lines changed: 14 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -68,89 +68,25 @@ private[pulsar] class PulsarSource(
6868

6969
private var currentTopicOffsets: Option[Map[String, MessageId]] = None
7070

71-
private lazy val pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(serviceUrl).build()
7271

7372
private lazy val pulsarSchema: SchemaInfo = pulsarHelper.getPulsarSchema
7473

7574
override def schema(): StructType = SchemaUtils.pulsarSourceSchema(pulsarSchema)
7675

7776
override def getOffset: Option[Offset] = {
78-
// Make sure initialTopicOffsets is initialized
79-
initialTopicOffsets
80-
val latest = pulsarHelper.fetchLatestOffsets()
81-
currentTopicOffsets = Some(latest.topicOffsets)
82-
Some(latest.asInstanceOf[Offset])
77+
throw new UnsupportedOperationException(
78+
"latestOffset(Offset, ReadLimit) should be called instead of this method")
8379
}
8480

8581
override def latestOffset(startingOffset: streaming.Offset,
8682
readLimit: ReadLimit): streaming.Offset = {
8783
initialTopicOffsets
88-
// implement helper inside PulsarHelper in order to use getTopicPartitions
89-
val latestOffsets = pulsarHelper.fetchLatestOffsets().topicOffsets
90-
// add new partitions from PulsarAdmin, set to earliest entry and ledger id based on limit
91-
// start a reader, get to the earliest offset for new topic partitions
92-
val existingStartOffsets = if (startingOffset != null) {
93-
getTopicOffsets(startingOffset.asInstanceOf[org.apache.spark.sql.execution.streaming.Offset])
94-
} else {
95-
Map[String, MessageId]()
96-
}
97-
val newTopics = latestOffsets.keySet.diff(existingStartOffsets.keySet)
98-
val startPartitionOffsets = existingStartOffsets ++ newTopics.map(topicPartition
99-
=> topicPartition -> pulsarHelper.fetchLatestOffsetForTopic(topicPartition))
100-
val totalReadLimit = AdmissionLimits(readLimit).get.bytesToTake
101-
val offsets = mutable.Map[String, MessageId]()
102-
offsets ++= startPartitionOffsets
103-
val numPartitions = startPartitionOffsets.size
104-
startPartitionOffsets.keys.filter(topicPartition => {
105-
pulsarAdmin.topics.getInternalStats(topicPartition).currentLedgerEntries > 0
106-
}).foreach { topicPartition =>
107-
var readLimit = totalReadLimit / numPartitions
108-
val messageId = startPartitionOffsets.apply(topicPartition)
109-
val ledgerId = getLedgerId(messageId)
110-
val entryId = getEntryId(messageId)
111-
val stats = pulsarAdmin.topics.getInternalStats(topicPartition)
112-
pulsarAdmin.topics.getInternalStats(topicPartition).ledgers.
113-
asScala.filter(_.ledgerId >= ledgerId).sortBy(_.ledgerId).foreach{ ledger =>
114-
ledger.entries = stats.currentLedgerEntries
115-
val avgBytesPerEntries = stats.currentLedgerSize / stats.currentLedgerEntries
116-
// approximation of bytes left in ledger to deal with case
117-
// where we are at the middle of the ledger
118-
val bytesLeftInLedger = avgBytesPerEntries * {
119-
if (ledger.ledgerId == ledgerId) {
120-
ledger.entries - entryId
121-
} else {
122-
ledger.entries
123-
}
124-
}
125-
if (readLimit > bytesLeftInLedger) {
126-
readLimit -= bytesLeftInLedger
127-
offsets += (topicPartition -> DefaultImplementation
128-
.getDefaultImplementation
129-
.newMessageId(ledger.ledgerId, ledger.entries - 1, -1))
130-
} else {
131-
val numEntriesToRead = Math.max(1, readLimit / avgBytesPerEntries)
132-
val lastEntryRead = Math.min(ledger.entries - 1, entryId + numEntriesToRead)
133-
offsets += (topicPartition -> DefaultImplementation
134-
.getDefaultImplementation
135-
.newMessageId(ledger.ledgerId, lastEntryRead, -1))
136-
readLimit = 0
137-
}
138-
}
139-
}
140-
SpecificPulsarOffset(offsets.toMap)
84+
val admissionLimits = AdmissionLimits(readLimit)
85+
pulsarHelper.latestOffsets(startingOffset, admissionLimits.get)
14186
}
14287
override def getDefaultReadLimit: ReadLimit = {
14388
ReadMaxBytes.apply(maxBytesPerTrigger)
14489
}
145-
class AdmissionLimits(var bytesToTake: Long)
146-
147-
object AdmissionLimits {
148-
def apply(limit: ReadLimit): Option[AdmissionLimits] = limit match {
149-
case maxBytes: ReadMaxBytes => Some(new AdmissionLimits(maxBytes.maxBytes))
150-
case _ : ReadAllAvailable => Some(new AdmissionLimits(Int.MaxValue))
151-
}
152-
153-
}
15490

15591
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
15692
// Make sure initialTopicOffsets is initialized
@@ -257,3 +193,13 @@ private[pulsar] class PulsarSource(
257193

258194
/** A read limit that admits a soft-max of `maxBytes` per micro-batch. */
259195
case class ReadMaxBytes(maxBytes: Long) extends ReadLimit
196+
197+
class AdmissionLimits(var bytesToTake: Long)
198+
199+
object AdmissionLimits {
200+
def apply(limit: ReadLimit): Option[AdmissionLimits] = limit match {
201+
case maxBytes: ReadMaxBytes => Some(new AdmissionLimits(maxBytes.maxBytes))
202+
case _: ReadAllAvailable => Some(new AdmissionLimits(Int.MaxValue))
203+
}
204+
205+
}

0 commit comments

Comments
 (0)