Skip to content

Commit f1b8d99

Browse files
Fix data loss of initial batch and add available now trigger test (#156)
* fix * fix sink suite * fix data loss * add available now trigger testing * make name more descriptive and try fixing tests
1 parent 67d5c58 commit f1b8d99

2 files changed

Lines changed: 262 additions & 1 deletion

File tree

src/main/scala/org/apache/spark/sql/pulsar/PulsarSources.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,14 +162,28 @@ class PulsarSourceInitialOffsetWriter(sparkSession: SparkSession, metadataPath:
162162
startingOffsets: PerTopicOffset,
163163
poolTimeoutMs: Int,
164164
reportDataLoss: String => Unit): SpecificPulsarOffset = {
165-
get(0).getOrElse {
165+
val deserializedOffset = get(0).map(markOffsetUserProvided(_))
166+
deserializedOffset.getOrElse {
166167
val actualOffsets = SpecificPulsarOffset(
167168
pulsarHelper.actualOffsets(startingOffsets, poolTimeoutMs, reportDataLoss))
168169
add(0, actualOffsets)
169170
logInfo(s"Initial Offsets: $actualOffsets")
170171
actualOffsets
171172
}
172173
}
174+
175+
// Mark a specific offset as user provided so that first records are not skipped.
176+
// This is needed because when initial offsets are deserialized from the metadata log,
177+
// they lose the UserProvidedMessageId type.
178+
private def markOffsetUserProvided(offsets: SpecificPulsarOffset): SpecificPulsarOffset = {
179+
val wrappedOffsets = offsets.topicOffsets.map { case (tp, mid) =>
180+
if (mid.isInstanceOf[UserProvidedMessageId] || mid == MessageId.earliest) {
181+
(tp, mid)
182+
}
183+
else (tp, UserProvidedMessageId(mid))
184+
}
185+
SpecificPulsarOffset(wrappedOffsets)
186+
}
173187
}
174188

175189
private[pulsar] case class PulsarOffsetRange private (
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
package org.apache.spark.sql.pulsar
2+
3+
import org.apache.spark.sql.pulsar.PulsarOptions.{FailOnDataLossOptionKey, ServiceUrlOptionKey, StartingOffsetsOptionKey, StartingTime, TopicMulti, TopicPattern, TopicSingle}
4+
import org.apache.spark.sql.streaming.Trigger
5+
6+
class PulsarMicroBatchSourceTriggerAvailableNowSuite extends PulsarSourceTest {
7+
import testImplicits._
8+
override val defaultTrigger: Trigger = Trigger.AvailableNow()
9+
val failOnDataLoss = true
10+
11+
test(s"assign from latest offsets: available now") {
12+
val topic = newTopic()
13+
testFromLatestOffsets(
14+
topic,
15+
addPartitions = false,
16+
failOnDataLoss = failOnDataLoss,
17+
TopicSingle -> topic)
18+
}
19+
20+
test(s"assign from earliest offsets: available now") {
21+
val topic = newTopic()
22+
testFromEarliestOffsets(
23+
topic,
24+
addPartitions = false,
25+
failOnDataLoss = failOnDataLoss,
26+
TopicSingle -> topic)
27+
}
28+
29+
test(s"assign from time: available now") {
30+
val topic = newTopic()
31+
testFromTime(
32+
topic,
33+
addPartitions = false,
34+
failOnDataLoss = failOnDataLoss,
35+
TopicSingle -> topic)
36+
}
37+
38+
test(s"assign from specific offsets: available now") {
39+
val topic = newTopic()
40+
testFromSpecificOffsets(
41+
topic,
42+
failOnDataLoss = failOnDataLoss,
43+
TopicSingle -> topic,
44+
FailOnDataLossOptionKey -> failOnDataLoss.toString)
45+
}
46+
47+
test(s"subscribing topic by name from latest offsets: available now") {
48+
val topic = newTopic()
49+
testFromLatestOffsets(
50+
topic,
51+
addPartitions = true,
52+
failOnDataLoss = failOnDataLoss,
53+
TopicMulti -> topic)
54+
}
55+
56+
test(s"subscribing topic by name from earliest offsets: available now") {
57+
val topic = newTopic()
58+
testFromEarliestOffsets(
59+
topic,
60+
addPartitions = true,
61+
failOnDataLoss = failOnDataLoss,
62+
TopicMulti -> topic)
63+
}
64+
65+
test(s"subscribing topic by name from specific offsets: available now") {
66+
val topic = newTopic()
67+
testFromSpecificOffsets(topic, failOnDataLoss = failOnDataLoss, TopicMulti -> topic)
68+
}
69+
70+
test(s"subscribing topic by pattern from latest offsets: available now") {
71+
val topicPrefix = newTopic()
72+
val topic = topicPrefix + "-suffix"
73+
testFromLatestOffsets(
74+
topic,
75+
addPartitions = true,
76+
failOnDataLoss = failOnDataLoss,
77+
TopicPattern -> s"$topicPrefix-.*")
78+
}
79+
80+
test(s"subscribing topic by pattern from earliest offsets: available now") {
81+
val topicPrefix = newTopic()
82+
val topic = topicPrefix + "-suffix"
83+
testFromEarliestOffsets(
84+
topic,
85+
addPartitions = true,
86+
failOnDataLoss = failOnDataLoss,
87+
TopicPattern -> s"$topicPrefix-.*")
88+
}
89+
90+
test(s"subscribing topic by pattern from specific offsets: available now") {
91+
val topicPrefix = newTopic()
92+
val topic = topicPrefix + "-suffix"
93+
testFromSpecificOffsets(
94+
topic,
95+
failOnDataLoss = failOnDataLoss,
96+
TopicPattern -> s"$topicPrefix-.*")
97+
}
98+
99+
private def testFromLatestOffsets(
100+
topic: String,
101+
addPartitions: Boolean,
102+
failOnDataLoss: Boolean,
103+
options: (String, String)*): Unit = {
104+
105+
sendMessages(topic, Array("-1", "0", "1"))
106+
require(getLatestOffsets(Set(topic)).size === 1)
107+
108+
val reader = spark.readStream
109+
.format("pulsar")
110+
.option(StartingOffsetsOptionKey, "latest")
111+
.option(ServiceUrlOptionKey, serviceUrl)
112+
.option(FailOnDataLossOptionKey, failOnDataLoss.toString)
113+
114+
options.foreach { case (k, v) => reader.option(k, v) }
115+
val pulsar = reader
116+
.load()
117+
.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
118+
.as[(String, String)]
119+
val mapped = pulsar.map(kv => kv._2.toInt + 1)
120+
121+
testStream(mapped)(
122+
AddPulsarData(Set(topic), 2, 3),
123+
StopStream,
124+
StartStream(),
125+
CheckAnswer(3, 4),
126+
AddPulsarData(Set(topic), 4, 5, 6),
127+
StopStream,
128+
StartStream(),
129+
CheckAnswer(3, 4, 5, 6, 7),
130+
AddPulsarData(Set(topic), 7, 8),
131+
StopStream,
132+
StartStream(),
133+
CheckAnswer(3, 4, 5, 6, 7, 8, 9),
134+
)
135+
}
136+
137+
private def testFromEarliestOffsets(
138+
topic: String,
139+
addPartitions: Boolean,
140+
failOnDataLoss: Boolean,
141+
options: (String, String)*): Unit = {
142+
143+
sendMessages(topic, (1 to 3).map { _.toString }.toArray)
144+
require(getLatestOffsets(Set(topic)).size === 1)
145+
146+
val reader = spark.readStream
147+
reader
148+
.format("pulsar")
149+
.option(StartingOffsetsOptionKey, "earliest")
150+
.option(ServiceUrlOptionKey, serviceUrl)
151+
.option(FailOnDataLossOptionKey, failOnDataLoss.toString)
152+
options.foreach { case (k, v) => reader.option(k, v) }
153+
val pulsar = reader
154+
.load()
155+
.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
156+
.as[(String, String)]
157+
val mapped = pulsar.map(kv => kv._2.toInt + 1)
158+
159+
testStream(mapped)(
160+
AddPulsarData(Set(topic), 4, 5, 6), // Add data when stream is stopped
161+
StopStream,
162+
StartStream(),
163+
CheckAnswer(2, 3, 4, 5, 6, 7),
164+
AddPulsarData(Set(topic), 7, 8),
165+
StopStream,
166+
StartStream(),
167+
CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
168+
AddPulsarData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
169+
StopStream,
170+
StartStream(),
171+
CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
172+
)
173+
}
174+
175+
private def testFromTime(
176+
topic: String,
177+
addPartitions: Boolean,
178+
failOnDataLoss: Boolean,
179+
options: (String, String)*): Unit = {
180+
181+
val time0 = System.currentTimeMillis() - 10000
182+
183+
sendMessages(topic, (1 to 3).map { _.toString }.toArray)
184+
require(getLatestOffsets(Set(topic)).size === 1)
185+
186+
def dfAfter(ts: Long) = {
187+
val reader = spark.readStream
188+
reader
189+
.format("pulsar")
190+
.option(StartingTime, time0)
191+
.option(ServiceUrlOptionKey, serviceUrl)
192+
.option(FailOnDataLossOptionKey, failOnDataLoss.toString)
193+
options.foreach { case (k, v) => reader.option(k, v) }
194+
val pulsar = reader
195+
.load()
196+
.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
197+
.as[(String, String)]
198+
val mapped = pulsar.map(kv => kv._2.toInt + 1)
199+
mapped
200+
}
201+
202+
testStream(dfAfter(time0))(
203+
AddPulsarData(Set(topic), 7, 8, 9),
204+
StopStream,
205+
StartStream(),
206+
CheckAnswer(2, 3, 4, 8, 9, 10)
207+
)
208+
}
209+
210+
private def testFromSpecificOffsets(
211+
topic: String,
212+
failOnDataLoss: Boolean,
213+
options: (String, String)*): Unit = {
214+
215+
val mids = sendMessages(
216+
topic,
217+
Array(
218+
// 0, 1, 2, 3, 4, 5, 6, 7, 8
219+
-20, -21, -22, 1, 2, 3, 10, 11, 12).map(_.toString),
220+
None).map(_._2)
221+
222+
val s1 = JsonUtils.topicOffsets(Map(topic -> mids(3)))
223+
224+
val reader = spark.readStream
225+
.format("pulsar")
226+
.option(StartingOffsetsOptionKey, s1)
227+
.option(ServiceUrlOptionKey, serviceUrl)
228+
.option(FailOnDataLossOptionKey, failOnDataLoss.toString)
229+
options.foreach { case (k, v) => reader.option(k, v) }
230+
val pulsar = reader
231+
.load()
232+
.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
233+
.as[(String, String)]
234+
val mapped = pulsar.map(kv => kv._2.toInt)
235+
236+
testStream(mapped)(
237+
AddPulsarData(Set(topic), 7),
238+
StopStream,
239+
StartStream(),
240+
CheckAnswer(1, 2, 3, 10, 11, 12, 7),
241+
AddPulsarData(Set(topic), 30, 31, 32, 33, 34),
242+
StopStream,
243+
StartStream(),
244+
CheckAnswer(1, 2, 3, 10, 11, 12, 7, 30, 31, 32, 33, 34)
245+
)
246+
}
247+
}

0 commit comments

Comments
 (0)