Skip to content

Commit dbb523f

Browse files
authored
dataconnect(feat): Realtime query results now update the local cache (#8220)
1 parent df09c49 commit dbb523f

7 files changed

Lines changed: 96 additions & 51 deletions

File tree

firebase-dataconnect/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
- [fixed] Queries executed with FetchPolicy.CACHE_ONLY now fail, as expected,
1313
if local caching is not enabled, instead of behaving like SERVER_ONLY.
1414
([#8214](https://github.com/firebase/firebase-android-sdk/pull/8214))
15+
- [changed] Realtime query results now update the local cache as query
16+
results are received.
17+
([#8220](https://github.com/firebase/firebase-android-sdk/pull/8220))
1518

1619
# 17.2.2
1720

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ internal class DataConnectBidiConnectStream(
9595
connectionStateFlow.value = SubscriptionEvent.Connected(event)
9696
}
9797
}
98-
.filterIsInstance<GrpcBidiFlow.Event.Message<StreamResponseProto>>()
98+
.filterIsInstance<GrpcBidiFlow.Event.Message<StreamResponseProto, AuthUid?>>()
9999
.map(SubscriptionEvent::Message)
100100
.onCompletion { throwable ->
101101
connectionStateFlow.value = SubscriptionEvent.Disconnected
@@ -178,11 +178,11 @@ internal class DataConnectBidiConnectStream(
178178
.transform { messageOrSubscribe ->
179179
when (messageOrSubscribe) {
180180
MessageOrSubscribe.Subscribed -> sendSubscribeOrResume()
181-
is MessageOrSubscribe.Message -> emit(messageOrSubscribe.message)
181+
is MessageOrSubscribe.Message -> emit(messageOrSubscribe)
182182
}
183183
}
184-
.filter { it.requestId == requestId }
185-
.mapNotNull { it.toExecuteResponse() }
184+
.filter { it.message.requestId == requestId }
185+
.mapNotNull { it.message.toExecuteResponse(it.authUid) }
186186

187187
// Configure the returned flow to end gracefully when FirebaseDataConnect.close() is called.
188188
return merge(subscriptionFlow, scopeCompletedFlow).transformWhile {
@@ -201,9 +201,9 @@ internal class DataConnectBidiConnectStream(
201201
override fun toString() = "Subscribed"
202202
}
203203

204-
class Message(val message: StreamResponseProto) : MessageOrSubscribe {
205-
constructor(event: SubscriptionEvent.Message) : this(event.message)
206-
override fun toString() = "Message(message=${message.toCompactString()})"
204+
class Message(val authUid: AuthUid?, val message: StreamResponseProto) : MessageOrSubscribe {
205+
constructor(event: SubscriptionEvent.Message) : this(event.authUid, event.message)
206+
override fun toString() = "Message(authUid=$authUid, message=${message.toCompactString()})"
207207
}
208208
}
209209

@@ -251,11 +251,14 @@ internal class DataConnectBidiConnectStream(
251251
/**
252252
* Represents the application-level response to a GraphQL execution request.
253253
*
254+
* @property authUid The Firebase Auth UID of the Firebase user under whose credentials the query
255+
* was executed, or `null` if no Firebase user was logged in.
254256
* @property data The data payload returned by the GraphQL query or mutation.
255257
* @property errors The errors related to the execution of the operation.
256258
* @property extensions Additional metadata or properties related to the execution.
257259
*/
258260
class ExecuteResponse(
261+
val authUid: AuthUid?,
259262
val data: Struct?,
260263
val errors: List<GraphqlErrorProto>,
261264
val extensions: List<DataConnectPropertiesProto>,
@@ -267,12 +270,17 @@ internal class DataConnectBidiConnectStream(
267270

268271
private sealed interface SubscriptionEvent {
269272

270-
class Message(val connectionId: String, val message: StreamResponseProto) : SubscriptionEvent {
273+
class Message(
274+
val connectionId: String,
275+
val authUid: AuthUid?,
276+
val message: StreamResponseProto,
277+
) : SubscriptionEvent {
271278
constructor(
272-
event: GrpcBidiFlow.Event.Message<StreamResponseProto>
273-
) : this(event.connectionId, event.message)
279+
event: GrpcBidiFlow.Event.Message<StreamResponseProto, AuthUid?>
280+
) : this(event.connectionId, event.connectionCookie, event.message)
274281
override fun toString() =
275-
"Message(connectionId=$connectionId, message=${message.toCompactString()})"
282+
"Message(connectionId=$connectionId, authUid=$authUid, " +
283+
"message=${message.toCompactString()})"
276284
}
277285

278286
sealed interface Connection : SubscriptionEvent
@@ -452,11 +460,12 @@ internal class DataConnectBidiConnectStream(
452460

453461
private companion object {
454462

455-
fun StreamResponseProto.toExecuteResponse(): ExecuteResponse? =
463+
fun StreamResponseProto.toExecuteResponse(authUid: AuthUid?): ExecuteResponse? =
456464
if (!hasData() && errorsCount == 0) {
457465
null
458466
} else {
459467
ExecuteResponse(
468+
authUid = authUid,
460469
data = if (hasData()) data else null,
461470
errors = if (errorsCount > 0) errorsList else emptyList(),
462471
extensions =

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCs.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -739,7 +739,7 @@ private fun ExecuteQueryResponse.getEntityIdForPathFunction(): GetEntityIdForPat
739739
}
740740

741741
@JvmName("getEntityIdForPathFunction_List_DataConnectProperties")
742-
private fun List<DataConnectProperties>.getEntityIdForPathFunction(): GetEntityIdForPathFunction? {
742+
internal fun List<DataConnectProperties>.getEntityIdForPathFunction(): GetEntityIdForPathFunction? {
743743
val entityIdByPath: Map<DataConnectPath, String>
744744
val entityIdsByPath: Map<DataConnectPath, List<String>>
745745

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/FirebaseDataConnectImpl.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ internal class FirebaseDataConnectImpl(
194194
val grpcRPCs = createDataConnectGrpcRPCs(backendInfo, cache)
195195
val grpcClient = createDataConnectGrpcClient(grpcRPCs)
196196
val queryManager = createQueryManager(grpcClient)
197-
val realtimeQueryManager = createRealtimeQueryManager(grpcClient)
197+
val realtimeQueryManager = createRealtimeQueryManager(grpcClient, cache)
198198
State.Initialized(cache, grpcRPCs, grpcClient, queryManager, realtimeQueryManager)
199199
}
200200
is State.Initialized -> currentState
@@ -371,12 +371,16 @@ internal class FirebaseDataConnectImpl(
371371
return QueryManager(liveQueries)
372372
}
373373

374-
private fun createRealtimeQueryManager(grpcClient: DataConnectGrpcClient): RealtimeQueryManager =
374+
private fun createRealtimeQueryManager(
375+
grpcClient: DataConnectGrpcClient,
376+
cache: DataConnectCache?,
377+
): RealtimeQueryManager =
375378
RealtimeQueryManager(
376379
grpcClient = grpcClient,
377380
coroutineScope = coroutineScope,
378381
idStringGenerator = idStringGenerator,
379382
serialization = serialization,
383+
cache = cache,
380384
logger = Logger("RealtimeQueryManager").apply { debug { "created by ${logger.nameWithId}" } },
381385
)
382386

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/querymgr/RealtimeQueryManager.kt

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,20 @@ import com.google.firebase.dataconnect.DataSource
2020
import com.google.firebase.dataconnect.FirebaseDataConnect.CallerSdkType
2121
import com.google.firebase.dataconnect.QueryRef
2222
import com.google.firebase.dataconnect.core.DataConnectBidiConnectStream
23+
import com.google.firebase.dataconnect.core.DataConnectCache
2324
import com.google.firebase.dataconnect.core.DataConnectGrpcClient
2425
import com.google.firebase.dataconnect.core.DataConnectSerialization
2526
import com.google.firebase.dataconnect.core.Logger
2627
import com.google.firebase.dataconnect.core.LoggerGlobals.debug
28+
import com.google.firebase.dataconnect.core.QueryId
29+
import com.google.firebase.dataconnect.core.calculateQueryId
30+
import com.google.firebase.dataconnect.core.getEntityIdForPathFunction
31+
import com.google.firebase.dataconnect.sqlite.GetEntityIdForPathFunction
2732
import com.google.firebase.dataconnect.util.CoroutineUtils.createChildSupervisorScope
2833
import com.google.firebase.dataconnect.util.IdStringGenerator
29-
import com.google.firebase.dataconnect.util.ImmutableByteArray
30-
import com.google.firebase.dataconnect.util.ProtoUtil.calculateSha512
3134
import com.google.firebase.dataconnect.util.update
3235
import com.google.protobuf.Struct
36+
import java.lang.System.currentTimeMillis
3337
import java.util.concurrent.atomic.AtomicReference
3438
import kotlinx.coroutines.CoroutineName
3539
import kotlinx.coroutines.CoroutineScope
@@ -40,6 +44,7 @@ import kotlinx.coroutines.awaitCancellation
4044
import kotlinx.coroutines.flow.Flow
4145
import kotlinx.coroutines.flow.emptyFlow
4246
import kotlinx.coroutines.flow.map
47+
import kotlinx.coroutines.flow.onEach
4348
import kotlinx.coroutines.job
4449
import kotlinx.coroutines.launch
4550
import kotlinx.coroutines.sync.Mutex
@@ -53,6 +58,7 @@ internal class RealtimeQueryManager(
5358
coroutineScope: CoroutineScope,
5459
private val idStringGenerator: IdStringGenerator,
5560
private val serialization: DataConnectSerialization,
61+
private val cache: DataConnectCache?,
5662
private val logger: Logger,
5763
) {
5864

@@ -135,24 +141,19 @@ internal class RealtimeQueryManager(
135141
operationName: String,
136142
variables: Struct,
137143
): Flow<DataConnectGrpcClient.OperationResult> {
138-
// calculateSha512() is a CPU intensive operation that should NOT be performed on the main
144+
// calculateQueryId() is a CPU intensive operation that should NOT be performed on the main
139145
// thread. This is the first reason why this method assumes it's running in this.coroutineScope.
140-
val queryId = variables.calculateSha512(preamble = operationName)
146+
val queryId = calculateQueryId(operationName, variables)
141147

142148
// Acquiring the lock by an arbitrary thread could result in priority inversion. This is the
143149
// second reason why this method assumes it's running in this.coroutineScope: control over the
144150
// thread that acquires the lock.
145151
mutex.withLock {
146152
return flowByQueryId.getOrPut(queryId) {
147-
val executeResponseFlow = stream.subscribe(requestId, operationName, variables)
148-
149-
executeResponseFlow.map { executeResponse ->
150-
DataConnectGrpcClient.OperationResult(
151-
data = executeResponse.data,
152-
errors = executeResponse.errors,
153-
source = DataSource.SERVER,
154-
)
155-
}
153+
stream
154+
.subscribe(requestId, operationName, variables)
155+
.updateCache(cache, queryId)
156+
.mapToOperationResponse()
156157
}
157158
}
158159
}
@@ -212,8 +213,7 @@ internal class RealtimeQueryManager(
212213

213214
class Connected(val stream: DataConnectBidiConnectStream) : State {
214215
val mutex = Mutex()
215-
val flowByQueryId:
216-
MutableMap<ImmutableByteArray, Flow<DataConnectGrpcClient.OperationResult>> =
216+
val flowByQueryId: MutableMap<QueryId, Flow<DataConnectGrpcClient.OperationResult>> =
217217
mutableMapOf()
218218
override fun toString() = "Connected"
219219
}
@@ -242,3 +242,37 @@ internal suspend fun <Data, Variables> RealtimeQueryManager.subscribe(
242242
queryRef.dataSerializersModule,
243243
queryRef.variablesSerializersModule,
244244
)
245+
246+
private fun Flow<DataConnectBidiConnectStream.ExecuteResponse>.mapToOperationResponse():
247+
Flow<DataConnectGrpcClient.OperationResult> = map { executeResponse ->
248+
DataConnectGrpcClient.OperationResult(
249+
data = executeResponse.data,
250+
errors = executeResponse.errors,
251+
source = DataSource.SERVER,
252+
)
253+
}
254+
255+
private fun Flow<DataConnectBidiConnectStream.ExecuteResponse>.updateCache(
256+
cache: DataConnectCache?,
257+
queryId: QueryId
258+
): Flow<DataConnectBidiConnectStream.ExecuteResponse> = onEach { response ->
259+
val cacheDb = cache?.open() ?: return@onEach
260+
val data = response.data ?: return@onEach // null indicates error
261+
cacheDb.insertQueryResult(
262+
response.authUid,
263+
queryId,
264+
data,
265+
cache.maxAgeProto,
266+
currentTimeMillis(),
267+
response.getEntityIdForPathFunction(),
268+
)
269+
}
270+
271+
@JvmName("getEntityIdForPathFunction_DataConnectBidiConnectStream_ExecuteResponse")
272+
private fun DataConnectBidiConnectStream.ExecuteResponse.getEntityIdForPathFunction():
273+
GetEntityIdForPathFunction? =
274+
if (extensions.isEmpty()) {
275+
null
276+
} else {
277+
extensions.getEntityIdForPathFunction()
278+
}

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/util/GrpcBidiFlow.kt

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -71,41 +71,34 @@ import kotlinx.coroutines.withContext
7171
*/
7272
internal object GrpcBidiFlow {
7373

74-
/**
75-
* Represents events emitted by the [Flow] created by [GrpcBidiFlow.create].
76-
*
77-
* @property connectionId The "connectionId" to uniquely identify a connection to the remote
78-
* server, especially for correlation with invocations of [Listener.collectStarted].
79-
*/
80-
sealed class Event<in RequestT, out ResponseT, out ConnectionCookie>(val connectionId: String) {
74+
/** Represents events emitted by the [Flow] created by [GrpcBidiFlow.create]. */
75+
sealed class Event<in RequestT, out ResponseT, out ConnectionCookie>(
76+
val connectionId: String,
77+
val connectionCookie: ConnectionCookie,
78+
) {
8179
/**
8280
* Emitted once when the gRPC flow collection starts.
8381
*
8482
* It provides a [SendChannel] that the caller can use to send requests to the server. Closing
8583
* this channel will half-close the gRPC stream from the client side.
86-
*
87-
* @param connectionId The unique identifier associated with this particular flow collection.
88-
* @property outgoingRequests The channel to send requests to the server.
8984
*/
9085
class ConnectionInfo<in RequestT, out ConnectionCookie>(
9186
connectionId: String,
92-
val connectionCookie: ConnectionCookie,
87+
connectionCookie: ConnectionCookie,
9388
val outgoingRequests: SendChannel<RequestT>,
94-
) : Event<RequestT, Nothing, ConnectionCookie>(connectionId) {
89+
) : Event<RequestT, Nothing, ConnectionCookie>(connectionId, connectionCookie) {
9590
override fun toString() =
9691
"ConnectionInfo(connectionId=$connectionId, connectionCookie=$connectionCookie)"
9792
}
9893

99-
/**
100-
* Emitted when a response message is received from the server.
101-
*
102-
* @property message The response message received from the server.
103-
*/
104-
class Message<out ResponseT>(
94+
/** Emitted when a response message is received from the server. */
95+
class Message<out ResponseT, out ConnectionCookie>(
10596
connectionId: String,
97+
connectionCookie: ConnectionCookie,
10698
val message: ResponseT,
107-
) : Event<Any?, ResponseT, Nothing>(connectionId) {
108-
override fun toString() = "Message(message=$message)"
99+
) : Event<Any?, ResponseT, ConnectionCookie>(connectionId, connectionCookie) {
100+
override fun toString() =
101+
"Message(connectionId=$connectionId, connectionCookie=$connectionCookie, message=$message)"
109102
}
110103
}
111104

@@ -311,7 +304,7 @@ internal object GrpcBidiFlow {
311304
clientCall.request(1)
312305
for (response in responses) {
313306
collectionListener?.receivedMessage(response)
314-
emit(Event.Message(connectionId, response))
307+
emit(Event.Message(connectionId, connectionCookie, response))
315308
clientCall.request(1)
316309
}
317310
collectionListener?.receivingMessagesComplete()

firebase-dataconnect/src/test/kotlin/com/google/firebase/dataconnect/querymgr/RealtimeQueryManagerUnitTest.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class RealtimeQueryManagerUnitTest {
6868
coroutineScope = backgroundScope,
6969
idStringGenerator = IdStringGenerator(rs.random),
7070
serialization = DataConnectSerialization(StandardTestDispatcher(testScheduler)),
71+
cache = null,
7172
logger = newMockLogger("s78hgm6fff")
7273
)
7374

@@ -104,6 +105,7 @@ class RealtimeQueryManagerUnitTest {
104105
coroutineScope = backgroundScope,
105106
idStringGenerator = IdStringGenerator(rs.random),
106107
serialization = DataConnectSerialization(StandardTestDispatcher(testScheduler)),
108+
cache = null,
107109
logger = newMockLogger("yzrpk2m6tt")
108110
)
109111

0 commit comments

Comments
 (0)