Skip to content

Commit 1101aa4

Browse files
authored
dataconnect(fix): Fix duplicate cache/server events from QuerySubscription.flow (#8240)
1 parent a1a58a7 commit 1101aa4

3 files changed

Lines changed: 185 additions & 9 deletions

File tree

firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/RealtimeQueryRefIntegrationTest.kt

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,24 @@ import com.google.firebase.dataconnect.testutil.property.arbitrary.pair
2222
import com.google.firebase.dataconnect.testutil.registerDataConnectKotestPrinters
2323
import com.google.firebase.dataconnect.testutil.schemas.RealtimeConnector
2424
import com.google.firebase.dataconnect.testutil.schemas.RealtimeConnector.GetStringByKeyQuery
25+
import io.kotest.assertions.withClue
26+
import io.kotest.common.ExperimentalKotest
27+
import io.kotest.matchers.collections.shouldBeIn
28+
import io.kotest.matchers.nulls.shouldBeNull
29+
import io.kotest.matchers.result.shouldBeSuccess
2530
import io.kotest.matchers.shouldBe
2631
import io.kotest.matchers.types.shouldBeSameInstanceAs
2732
import io.kotest.property.Arb
33+
import io.kotest.property.EdgeConfig
34+
import io.kotest.property.PropTestConfig
35+
import io.kotest.property.ShrinkingMode
2836
import io.kotest.property.arbitrary.Codepoint
2937
import io.kotest.property.arbitrary.az
3038
import io.kotest.property.arbitrary.map
39+
import io.kotest.property.arbitrary.of
3140
import io.kotest.property.arbitrary.string
41+
import io.kotest.property.checkAll
42+
import kotlin.time.Duration
3243
import kotlinx.coroutines.test.runTest
3344
import kotlinx.serialization.serializer
3445
import org.junit.Before
@@ -138,4 +149,82 @@ class RealtimeQueryRefIntegrationTest : DataConnectIntegrationTestBase() {
138149
awaitItem().result.getOrThrow().dataSource shouldBe DataSource.SERVER
139150
}
140151
}
152+
153+
@Test
154+
fun emittedResultsUpdateTheLocalCacheForTheUnderlyingQuery() = runTest {
155+
val dataConnect =
156+
dataConnectFactory.newInstance(
157+
RealtimeConnector.config,
158+
CacheSettings(maxAge = Duration.INFINITE)
159+
)
160+
val connector = RealtimeConnector.getInstance(dataConnect)
161+
val fetchPolicyArb = Arb.of(QueryRef.FetchPolicy.PREFER_CACHE, QueryRef.FetchPolicy.CACHE_ONLY)
162+
163+
checkAll(propTestConfig, nameArb.pair(), fetchPolicyArb) { (name1, name2), fetchPolicy ->
164+
val key = connector.insertString(name1)
165+
166+
val ref =
167+
dataConnect.query(
168+
GetStringByKeyQuery.OPERATION_NAME,
169+
GetStringByKeyQuery.Variables(key),
170+
serializer<GetStringByKeyQuery.Data>(),
171+
serializer(),
172+
)
173+
174+
suspend fun verifyCachedData(
175+
clue: String,
176+
fetchPolicy: QueryRef.FetchPolicy,
177+
expectedNames: List<String>?,
178+
) {
179+
withClue(clue) {
180+
val result = ref.execute(fetchPolicy)
181+
result.dataSource shouldBe DataSource.CACHE
182+
if (expectedNames == null) {
183+
result.data.item.shouldBeNull()
184+
} else {
185+
result.data shouldBeIn expectedNames.map(GetStringByKeyQuery::Data)
186+
}
187+
}
188+
}
189+
190+
suspend fun verifyCachedData(
191+
clue: String,
192+
fetchPolicy: QueryRef.FetchPolicy,
193+
vararg expectedNames: String,
194+
) = verifyCachedData(clue, fetchPolicy, expectedNames.toList())
195+
196+
ref.subscribe().flow.test {
197+
suspend fun awaitItemAndCheckName(clue: String, expectedName: String?) {
198+
val queryResult = awaitItem().result.shouldBeSuccess()
199+
val expectedData =
200+
GetStringByKeyQuery.Data(GetStringByKeyQuery.Data.Item.fromNameOrNull(expectedName))
201+
check(queryResult.data == expectedData) {
202+
"$clue internal test error cvyx4f32ft: queryResult=$queryResult, " +
203+
"but expected its data to be $expectedData"
204+
}
205+
}
206+
207+
awaitItemAndCheckName("awaitCheck1", name1)
208+
verifyCachedData("cacheCheck1", fetchPolicy, name1)
209+
210+
connector.updateString(key, name2)
211+
verifyCachedData("cacheCheck2", fetchPolicy, name1, name2)
212+
awaitItemAndCheckName("awaitCheck2", name2)
213+
verifyCachedData("cacheCheck3", fetchPolicy, name2)
214+
215+
connector.deleteString(key)
216+
verifyCachedData("cacheCheck4", fetchPolicy, name2)
217+
awaitItemAndCheckName("awaitCheck3", null)
218+
verifyCachedData("cacheCheck5", fetchPolicy, null)
219+
}
220+
}
221+
}
141222
}
223+
224+
@OptIn(ExperimentalKotest::class)
225+
private val propTestConfig =
226+
PropTestConfig(
227+
iterations = 10,
228+
edgeConfig = EdgeConfig(edgecasesGenerationProbability = 0.2),
229+
shrinkingMode = ShrinkingMode.Off,
230+
)

firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/schemas/RealtimeConnector.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,12 @@ class RealtimeConnector private constructor(dataConnectInternal: FirebaseDataCon
7474
@Serializable
7575
data class Data(val item: Item?) {
7676
constructor(name: String) : this(Item(name))
77-
@Serializable data class Item(val name: String)
77+
@Serializable
78+
data class Item(val name: String) {
79+
companion object {
80+
fun fromNameOrNull(name: String?): Item? = if (name == null) null else Item(name)
81+
}
82+
}
7883
}
7984

8085
companion object {

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/QuerySubscriptionImpl.kt

Lines changed: 90 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import com.google.firebase.dataconnect.DataSource
1919
import com.google.firebase.dataconnect.QuerySubscription
2020
import com.google.firebase.dataconnect.QuerySubscriptionResult
2121
import com.google.firebase.dataconnect.querymgr.subscribe
22+
import com.google.firebase.dataconnect.sqlite.DataConnectCacheDatabase.SqliteSequenceNumber
2223
import com.google.firebase.dataconnect.sqlite.SqliteSequencedReference
2324
import com.google.firebase.dataconnect.util.SequencedReference
2425
import com.google.firebase.dataconnect.util.throwIfCancellationException
@@ -27,6 +28,8 @@ import kotlinx.coroutines.channels.SendChannel
2728
import kotlinx.coroutines.flow.Flow
2829
import kotlinx.coroutines.flow.channelFlow
2930
import kotlinx.coroutines.launch
31+
import kotlinx.coroutines.sync.Mutex
32+
import kotlinx.coroutines.sync.withLock
3033

3134
internal class QuerySubscriptionImpl<Data, Variables>(
3235
override val query: QueryRefImpl<Data, Variables>,
@@ -78,34 +81,113 @@ internal class QuerySubscriptionImpl<Data, Variables>(
7881
private inner class ResultSender(
7982
val channel: SendChannel<QuerySubscriptionResultImpl>,
8083
) {
84+
private val mutex = Mutex()
85+
private var lastEmittedSqliteSequenceNumber: SqliteSequenceNumber? = null
86+
8187
suspend fun onNonRealtimeUpdate(
8288
event: SequencedReference<Result<SourcedData<Data>>>,
8389
) {
84-
val (source, _, data) = event.ref.getOrNull() ?: return
85-
emit(data, source)
90+
val (source, sqliteSequenceNumber, data) = event.ref.getOrNull() ?: return
91+
mutex.withLock {
92+
if (shouldEmitNonRealtime(source, sqliteSequenceNumber, lastEmittedSqliteSequenceNumber)) {
93+
emit(data, source, sqliteSequenceNumber)
94+
}
95+
}
8696
}
8797

8898
suspend fun onRealtimeUpdate(
8999
event: Result<SqliteSequencedReference<Data>>,
90100
) {
91101
event.throwIfCancellationException()
92-
val queryResult = event.map { query.QueryResultImpl(it.ref, DataSource.SERVER) }
93-
emit(queryResult)
102+
103+
mutex.withLock {
104+
if (shouldEmitRealtime(event, lastEmittedSqliteSequenceNumber)) {
105+
val queryResult = event.map { query.QueryResultImpl(it.ref, DataSource.SERVER) }
106+
emit(queryResult, event.getOrNull()?.sqliteSequenceNumber)
107+
}
108+
}
94109
}
95110

96111
private suspend fun emit(
97112
queryResult: Result<QueryRefImpl<Data, Variables>.QueryResultImpl>,
113+
dataSqliteSequenceNumber: SqliteSequenceNumber?,
98114
) {
99115
val subscriptionResult = QuerySubscriptionResultImpl(query, queryResult)
100116
channel.send(subscriptionResult)
117+
if (dataSqliteSequenceNumber != null) {
118+
lastEmittedSqliteSequenceNumber = dataSqliteSequenceNumber
119+
}
101120
}
102121

103-
private suspend fun emit(data: Data, source: DataSource) {
104-
emit(query.QueryResultImpl(data, source))
122+
private suspend fun emit(
123+
data: Data,
124+
source: DataSource,
125+
dataSqliteSequenceNumber: SqliteSequenceNumber?,
126+
) {
127+
emit(query.QueryResultImpl(data, source), dataSqliteSequenceNumber)
105128
}
106129

107-
private suspend fun emit(queryResult: QueryRefImpl<Data, Variables>.QueryResultImpl) {
108-
emit(Result.success(queryResult))
130+
private suspend fun emit(
131+
queryResult: QueryRefImpl<Data, Variables>.QueryResultImpl,
132+
dataSqliteSequenceNumber: SqliteSequenceNumber?,
133+
) {
134+
emit(Result.success(queryResult), dataSqliteSequenceNumber)
109135
}
110136
}
111137
}
138+
139+
private fun shouldEmitNonRealtime(
140+
dataSource: DataSource,
141+
dataSqliteSequenceNumber: SqliteSequenceNumber?,
142+
lastEmittedSqliteSequenceNumber: SqliteSequenceNumber?,
143+
): Boolean {
144+
// Emit the data if `lastEmittedSqliteSequenceNumber` is null, as that indicates that there have
145+
// been no results emitted yet. Regardless of the age of the data, we may as well emit something.
146+
if (lastEmittedSqliteSequenceNumber == null) {
147+
return true
148+
}
149+
150+
// Return an appropriate value when `dataSqliteSequenceNumber` is null. The meaning of a null
151+
// value depends on the source of the data.
152+
if (dataSqliteSequenceNumber == null) {
153+
return when (dataSource) {
154+
// Do not emit the data because it's so old that it was saved to cache by an older version of
155+
// the SDK that lacked SqliteSequenceNumber support; therefore, the data cannot possibly be
156+
// newer than the data previously emitted with `lastEmittedSqliteSequenceNumber`.
157+
DataSource.CACHE -> false
158+
// Emit the data because either saving the data to the cache failed, or caching is not enabled
159+
// at all. Either way, there is no way to tell if this data is older than the data previously
160+
// emitted with `lastEmittedSqliteSequenceNumber`, so assume that it is newer.
161+
DataSource.SERVER -> true
162+
}
163+
}
164+
165+
// Emit the data if, and only if, it is newer than the data previously emitted with
166+
// `lastEmittedSqliteSequenceNumber`.
167+
return dataSqliteSequenceNumber > lastEmittedSqliteSequenceNumber
168+
}
169+
170+
private fun shouldEmitRealtime(
171+
event: Result<SqliteSequencedReference<*>>,
172+
lastEmittedSqliteSequenceNumber: SqliteSequenceNumber?,
173+
): Boolean {
174+
// Emit failures unconditionally, since they do not have an associated SqliteSequenceNumbers.
175+
if (event.isFailure) {
176+
return true
177+
}
178+
179+
// Emit the data if `lastEmittedSqliteSequenceNumber` is null, as that indicates that there have
180+
// been no results emitted yet. Regardless of the age of the data, we may as well emit something.
181+
if (lastEmittedSqliteSequenceNumber == null) {
182+
return true
183+
}
184+
185+
// Emit the data if its SqliteSequenceNumber is null because that means saving the data to the
186+
// cache failed. In this case there is no way to tell if this data is older than the data
187+
// previously emitted with `lastEmittedSqliteSequenceNumber`, so assume that it is newer.
188+
val dataSqliteSequenceNumber = event.getOrThrow().sqliteSequenceNumber ?: return true
189+
190+
// Emit the data if, and only if, it is newer than the data previously emitted with
191+
// `lastEmittedSqliteSequenceNumber`.
192+
return dataSqliteSequenceNumber > lastEmittedSqliteSequenceNumber
193+
}

0 commit comments

Comments
 (0)