Skip to content

Commit 83d0dda

Browse files
authored
dataconnect(chore): refactor cache database management logic out of network layer (#8211)
1 parent e313666 commit 83d0dda

12 files changed

Lines changed: 1559 additions & 136 deletions

File tree

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.firebase.dataconnect.core
18+
19+
import com.google.firebase.dataconnect.core.LoggerGlobals.Logger
20+
import com.google.firebase.dataconnect.core.LoggerGlobals.debug
21+
import com.google.firebase.dataconnect.sqlite.DataConnectCacheDatabase
22+
import com.google.firebase.dataconnect.util.ObjectLifecycleManager
23+
import java.io.File
24+
import kotlinx.coroutines.CoroutineDispatcher
25+
26+
internal class DataConnectCache(
27+
private val dbFile: File?,
28+
val maxAge: kotlin.time.Duration,
29+
private val cpuDispatcher: CoroutineDispatcher,
30+
private val logger: Logger,
31+
) : ObjectLifecycleManager<DataConnectCacheDatabase>(cpuDispatcher, logger) {
32+
33+
val maxAgeProto: com.google.protobuf.Duration =
34+
maxAge.toComponents { seconds, nanos ->
35+
com.google.protobuf.Duration.newBuilder().setSeconds(seconds).setNanos(nanos).build()
36+
}
37+
38+
override fun create() =
39+
DataConnectCacheDatabase(
40+
dbFile,
41+
cpuDispatcher,
42+
Logger("DataConnectCacheDatabase").apply { debug { "created by ${logger.nameWithId}" } }
43+
)
44+
45+
override suspend fun initialize(instance: DataConnectCacheDatabase) {
46+
instance.initialize()
47+
}
48+
49+
override suspend fun destroy(instance: DataConnectCacheDatabase) {
50+
instance.close()
51+
}
52+
53+
override fun toString() = "DataConnectCache(dbFile=$dbFile, maxAge=$maxAge)"
54+
}

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCs.kt

Lines changed: 24 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,12 @@ import com.google.firebase.dataconnect.util.CoroutineUtils
3535
import com.google.firebase.dataconnect.util.GrpcBidiFlow
3636
import com.google.firebase.dataconnect.util.GrpcBidiFlowListenerMessageFormatter
3737
import com.google.firebase.dataconnect.util.IdStringGenerator
38-
import com.google.firebase.dataconnect.util.NullableReference
3938
import com.google.firebase.dataconnect.util.ProtoUtil.buildStructProto
4039
import com.google.firebase.dataconnect.util.ProtoUtil.toCompactString
4140
import com.google.firebase.dataconnect.util.ProtoUtil.toDataConnectPath
4241
import com.google.firebase.dataconnect.util.ProtoUtil.toStructProto
4342
import com.google.firebase.dataconnect.util.SuspendingLazy
4443
import com.google.firebase.dataconnect.util.copy
45-
import com.google.protobuf.Duration as DurationProto
4644
import com.google.protobuf.Struct
4745
import google.firebase.dataconnect.proto.ConnectorServiceGrpc
4846
import google.firebase.dataconnect.proto.ConnectorServiceGrpcKt
@@ -69,11 +67,9 @@ import io.grpc.Status
6967
import io.grpc.StatusException
7068
import io.grpc.StatusRuntimeException
7169
import io.grpc.android.AndroidChannelBuilder
72-
import java.io.File
7370
import java.lang.System.currentTimeMillis
7471
import java.util.concurrent.Executor
7572
import java.util.concurrent.TimeUnit
76-
import kotlin.time.Duration
7773
import kotlinx.coroutines.CancellationException
7874
import kotlinx.coroutines.CoroutineDispatcher
7975
import kotlinx.coroutines.asExecutor
@@ -95,7 +91,7 @@ internal class DataConnectGrpcRPCs(
9591
private val nonBlockingCoroutineDispatcher: CoroutineDispatcher,
9692
private val blockingCoroutineDispatcher: CoroutineDispatcher,
9793
private val grpcMetadata: DataConnectGrpcMetadata,
98-
private val cacheSettings: CacheSettings?,
94+
private val cache: DataConnectCache?,
9995
parentLogger: Logger,
10096
) {
10197
private val logger =
@@ -104,7 +100,7 @@ internal class DataConnectGrpcRPCs(
104100
"created by ${parentLogger.nameWithId} with" +
105101
" host=$host" +
106102
" sslEnabled=$sslEnabled" +
107-
" cacheSettings=$cacheSettings" +
103+
" cache=$cache" +
108104
" grpcMetadata=${grpcMetadata.instanceId}"
109105
}
110106
}
@@ -119,38 +115,6 @@ internal class DataConnectGrpcRPCs(
119115
private val mutex = Mutex()
120116
private var closed = false
121117

122-
data class CacheSettings(val dbFile: File?, val maxAge: Duration)
123-
124-
private data class CacheDbSettingsPair(
125-
val db: DataConnectCacheDatabase,
126-
val maxAge: DurationProto,
127-
)
128-
129-
// Use the non-main-thread CoroutineDispatcher to avoid blocking operations on the main thread.
130-
private val lazyCacheDb =
131-
SuspendingLazy(mutex = mutex, coroutineContext = blockingCoroutineDispatcher) {
132-
check(!closed) { "DataConnectGrpcRPCs ${logger.nameWithId} instance has been closed" }
133-
if (cacheSettings === null) {
134-
NullableReference()
135-
} else {
136-
logger.debug { "Creating GRPC ManagedChannel for host=$host sslEnabled=$sslEnabled" }
137-
138-
val maxAge =
139-
cacheSettings.maxAge.toComponents { seconds, nanos ->
140-
DurationProto.newBuilder().setSeconds(seconds).setNanos(nanos).build()
141-
}
142-
143-
val dbFile = cacheSettings.dbFile
144-
val cacheLogger = Logger("DataConnectCacheDatabase")
145-
cacheLogger.debug {
146-
"created by ${logger.nameWithId} with dbFile=$dbFile maxAge=${cacheSettings.maxAge}"
147-
}
148-
val cacheDb = DataConnectCacheDatabase(dbFile, cacheLogger)
149-
cacheDb.initialize()
150-
NullableReference(CacheDbSettingsPair(cacheDb, maxAge))
151-
}
152-
}
153-
154118
// Use the non-main-thread CoroutineDispatcher to avoid blocking operations on the main thread.
155119
private val lazyGrpcChannel =
156120
SuspendingLazy(mutex = mutex, coroutineContext = blockingCoroutineDispatcher) {
@@ -238,28 +202,20 @@ internal class DataConnectGrpcRPCs(
238202
}
239203

240204
private class QueryCacheInfo(
241-
val cacheDb: DataConnectCacheDatabase,
205+
val cache: DataConnectCache,
242206
val authUid: AuthUid?,
243207
val queryId: QueryId,
244-
val maxAge: DurationProto,
245208
)
246209

247-
private suspend fun queryCacheInfo(
210+
private suspend fun DataConnectCache.queryCacheInfo(
248211
authToken: DataConnectAuth.GetAuthTokenResult?,
249212
request: ExecuteQueryRequest,
250-
): QueryCacheInfo? {
213+
): QueryCacheInfo {
251214
val queryId =
252215
withContext(nonBlockingCoroutineDispatcher) {
253216
calculateQueryId(request.operationName, request.variables)
254217
}
255-
return lazyCacheDb.get().ref?.let { (cacheDb, maxAge) ->
256-
QueryCacheInfo(
257-
cacheDb,
258-
authUid = authToken?.authUid,
259-
queryId = queryId,
260-
maxAge = maxAge,
261-
)
262-
}
218+
return QueryCacheInfo(this, authToken?.authUid, queryId)
263219
}
264220

265221
suspend fun executeQuery(
@@ -282,7 +238,7 @@ internal class DataConnectGrpcRPCs(
282238
it.build()
283239
}
284240

285-
val cacheInfo = queryCacheInfo(authToken, request)
241+
val cacheInfo = cache?.queryCacheInfo(authToken, request)
286242
if (cacheInfo == null && fetchPolicy == FetchPolicy.CACHE_ONLY) {
287243
throw CachedDataNotFoundException(
288244
"FetchPolicy.CACHE_ONLY cannot be used because local caching is not configured. " +
@@ -335,14 +291,16 @@ internal class DataConnectGrpcRPCs(
335291
)
336292

337293
cacheInfo?.run {
338-
cacheDb.insertQueryResult(
339-
authUid,
340-
queryId,
341-
queryData = response.data,
342-
maxAge = maxAge,
343-
currentTimeMillis = currentTimeMillis(),
344-
getEntityIdForPath = response.getEntityIdForPathFunction(),
345-
)
294+
cache
295+
.open()
296+
.insertQueryResult(
297+
authUid,
298+
queryId,
299+
queryData = response.data,
300+
maxAge = cache.maxAgeProto,
301+
currentTimeMillis = currentTimeMillis(),
302+
getEntityIdForPath = response.getEntityIdForPathFunction(),
303+
)
346304
}
347305
}
348306

@@ -368,7 +326,8 @@ internal class DataConnectGrpcRPCs(
368326
else -> DataConnectCacheDatabase.GetQueryResultResult.Stale::class
369327
}
370328

371-
val cachedResult = cacheDb.getQueryResult(authUid, queryId, currentTimeMillis(), staleResult)
329+
val cachedResult =
330+
cache.open().getQueryResult(authUid, queryId, currentTimeMillis(), staleResult)
372331

373332
val cachedData: Struct? =
374333
when (cachedResult) {
@@ -607,37 +566,14 @@ internal class DataConnectGrpcRPCs(
607566
suspend fun close() {
608567
logger.debug { "close()" }
609568
mutex.withLock { closed = true }
610-
connectCoroutineScope.cancel("DataConnectGrpcRPCs.close() called [xn8dqn8dzm]")
611569

612-
val grpcChannel = lazyGrpcChannel.initializedValueOrNull
613-
val cacheDb = lazyCacheDb.initializedValueOrNull?.ref
614-
615-
if (grpcChannel === null && cacheDb === null) {
616-
connectCoroutineScope.coroutineContext.job.join()
617-
return
618-
}
570+
connectCoroutineScope.cancel("DataConnectGrpcRPCs.close() called [xn8dqn8dzm]")
619571

620-
// Avoid blocking the calling thread by running potentially-blocking code on the dispatcher
621-
// given to the constructor, which should have similar semantics to [Dispatchers.IO].
622-
val grpcChannelShutdownResult: Result<*>
623-
val cacheDbCloseResult: Result<*>
624-
withContext(blockingCoroutineDispatcher) {
625-
grpcChannelShutdownResult = runCatching {
626-
grpcChannel?.shutdownNow()
627-
grpcChannel?.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)
572+
lazyGrpcChannel.initializedValueOrNull?.let { grpcChannel ->
573+
withContext(blockingCoroutineDispatcher) {
574+
grpcChannel.shutdownNow()
575+
grpcChannel.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)
628576
}
629-
cacheDbCloseResult = runCatching { cacheDb?.db?.close() }
630-
}
631-
632-
// Bundle together any exceptions that were thrown.
633-
val exceptions =
634-
listOf(
635-
grpcChannelShutdownResult,
636-
cacheDbCloseResult,
637-
)
638-
.mapNotNull { it.exceptionOrNull() }
639-
if (exceptions.isNotEmpty()) {
640-
throw exceptions.first().apply { exceptions.drop(1).forEach { addSuppressed(it) } }
641577
}
642578

643579
connectCoroutineScope.coroutineContext.job.join()

0 commit comments

Comments
 (0)