Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions firebase-dataconnect/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
- [fixed] Queries executed with FetchPolicy.CACHE_ONLY now fail, as expected,
if local caching is not enabled, instead of behaving like SERVER_ONLY.
([#8214](https://github.com/firebase/firebase-android-sdk/pull/8214))
- [changed] Realtime query results now update the local cache as query
results are received.
([#8220](https://github.com/firebase/firebase-android-sdk/pull/8220))

# 17.2.2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ internal class DataConnectBidiConnectStream(
connectionStateFlow.value = SubscriptionEvent.Connected(event)
}
}
.filterIsInstance<GrpcBidiFlow.Event.Message<StreamResponseProto>>()
.filterIsInstance<GrpcBidiFlow.Event.Message<StreamResponseProto, AuthUid?>>()
.map(SubscriptionEvent::Message)
.onCompletion { throwable ->
connectionStateFlow.value = SubscriptionEvent.Disconnected
Expand Down Expand Up @@ -178,11 +178,11 @@ internal class DataConnectBidiConnectStream(
.transform { messageOrSubscribe ->
when (messageOrSubscribe) {
MessageOrSubscribe.Subscribed -> sendSubscribeOrResume()
is MessageOrSubscribe.Message -> emit(messageOrSubscribe.message)
is MessageOrSubscribe.Message -> emit(messageOrSubscribe)
}
}
.filter { it.requestId == requestId }
.mapNotNull { it.toExecuteResponse() }
.filter { it.message.requestId == requestId }
.mapNotNull { it.message.toExecuteResponse(it.authUid) }

// Configure the returned flow to end gracefully when FirebaseDataConnect.close() is called.
return merge(subscriptionFlow, scopeCompletedFlow).transformWhile {
Expand All @@ -201,9 +201,9 @@ internal class DataConnectBidiConnectStream(
override fun toString() = "Subscribed"
}

class Message(val message: StreamResponseProto) : MessageOrSubscribe {
constructor(event: SubscriptionEvent.Message) : this(event.message)
override fun toString() = "Message(message=${message.toCompactString()})"
class Message(val authUid: AuthUid?, val message: StreamResponseProto) : MessageOrSubscribe {
constructor(event: SubscriptionEvent.Message) : this(event.authUid, event.message)
override fun toString() = "Message(authUid=$authUid, message=${message.toCompactString()})"
}
}

Expand Down Expand Up @@ -251,11 +251,14 @@ internal class DataConnectBidiConnectStream(
/**
* Represents the application-level response to a GraphQL execution request.
*
* @property authUid The Firebase Auth UID of the Firebase user under whose credentials the query
* was executed, or `null` if no Firebase user was logged in.
* @property data The data payload returned by the GraphQL query or mutation.
* @property errors The errors related to the execution of the operation.
* @property extensions Additional metadata or properties related to the execution.
*/
class ExecuteResponse(
val authUid: AuthUid?,
val data: Struct?,
val errors: List<GraphqlErrorProto>,
val extensions: List<DataConnectPropertiesProto>,
Expand All @@ -267,12 +270,17 @@ internal class DataConnectBidiConnectStream(

private sealed interface SubscriptionEvent {

class Message(val connectionId: String, val message: StreamResponseProto) : SubscriptionEvent {
class Message(
val connectionId: String,
val authUid: AuthUid?,
val message: StreamResponseProto,
) : SubscriptionEvent {
constructor(
event: GrpcBidiFlow.Event.Message<StreamResponseProto>
) : this(event.connectionId, event.message)
event: GrpcBidiFlow.Event.Message<StreamResponseProto, AuthUid?>
) : this(event.connectionId, event.connectionCookie, event.message)
override fun toString() =
"Message(connectionId=$connectionId, message=${message.toCompactString()})"
"Message(connectionId=$connectionId, authUid=$authUid, " +
"message=${message.toCompactString()})"
}

sealed interface Connection : SubscriptionEvent
Expand Down Expand Up @@ -452,11 +460,12 @@ internal class DataConnectBidiConnectStream(

private companion object {

fun StreamResponseProto.toExecuteResponse(): ExecuteResponse? =
fun StreamResponseProto.toExecuteResponse(authUid: AuthUid?): ExecuteResponse? =
if (!hasData() && errorsCount == 0) {
null
} else {
ExecuteResponse(
authUid = authUid,
data = if (hasData()) data else null,
errors = if (errorsCount > 0) errorsList else emptyList(),
extensions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ private fun ExecuteQueryResponse.getEntityIdForPathFunction(): GetEntityIdForPat
}

@JvmName("getEntityIdForPathFunction_List_DataConnectProperties")
private fun List<DataConnectProperties>.getEntityIdForPathFunction(): GetEntityIdForPathFunction? {
internal fun List<DataConnectProperties>.getEntityIdForPathFunction(): GetEntityIdForPathFunction? {
val entityIdByPath: Map<DataConnectPath, String>
val entityIdsByPath: Map<DataConnectPath, List<String>>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ internal class FirebaseDataConnectImpl(
val grpcRPCs = createDataConnectGrpcRPCs(backendInfo, cache)
val grpcClient = createDataConnectGrpcClient(grpcRPCs)
val queryManager = createQueryManager(grpcClient)
val realtimeQueryManager = createRealtimeQueryManager(grpcClient)
val realtimeQueryManager = createRealtimeQueryManager(grpcClient, cache)
State.Initialized(cache, grpcRPCs, grpcClient, queryManager, realtimeQueryManager)
}
is State.Initialized -> currentState
Expand Down Expand Up @@ -371,12 +371,16 @@ internal class FirebaseDataConnectImpl(
return QueryManager(liveQueries)
}

private fun createRealtimeQueryManager(grpcClient: DataConnectGrpcClient): RealtimeQueryManager =
private fun createRealtimeQueryManager(
grpcClient: DataConnectGrpcClient,
cache: DataConnectCache?,
): RealtimeQueryManager =
RealtimeQueryManager(
grpcClient = grpcClient,
coroutineScope = coroutineScope,
idStringGenerator = idStringGenerator,
serialization = serialization,
cache = cache,
logger = Logger("RealtimeQueryManager").apply { debug { "created by ${logger.nameWithId}" } },
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ import com.google.firebase.dataconnect.DataSource
import com.google.firebase.dataconnect.FirebaseDataConnect.CallerSdkType
import com.google.firebase.dataconnect.QueryRef
import com.google.firebase.dataconnect.core.DataConnectBidiConnectStream
import com.google.firebase.dataconnect.core.DataConnectCache
import com.google.firebase.dataconnect.core.DataConnectGrpcClient
import com.google.firebase.dataconnect.core.DataConnectSerialization
import com.google.firebase.dataconnect.core.Logger
import com.google.firebase.dataconnect.core.LoggerGlobals.debug
Comment thread
dconeybe marked this conversation as resolved.
import com.google.firebase.dataconnect.core.QueryId
import com.google.firebase.dataconnect.core.calculateQueryId
import com.google.firebase.dataconnect.core.getEntityIdForPathFunction
import com.google.firebase.dataconnect.sqlite.GetEntityIdForPathFunction
import com.google.firebase.dataconnect.util.CoroutineUtils.createChildSupervisorScope
import com.google.firebase.dataconnect.util.IdStringGenerator
import com.google.firebase.dataconnect.util.ImmutableByteArray
import com.google.firebase.dataconnect.util.ProtoUtil.calculateSha512
import com.google.firebase.dataconnect.util.update
import com.google.protobuf.Struct
import java.lang.System.currentTimeMillis
import java.util.concurrent.atomic.AtomicReference
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
Expand All @@ -40,6 +44,7 @@ import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
Expand All @@ -53,6 +58,7 @@ internal class RealtimeQueryManager(
coroutineScope: CoroutineScope,
private val idStringGenerator: IdStringGenerator,
private val serialization: DataConnectSerialization,
private val cache: DataConnectCache?,
private val logger: Logger,
) {

Expand Down Expand Up @@ -135,24 +141,19 @@ internal class RealtimeQueryManager(
operationName: String,
variables: Struct,
): Flow<DataConnectGrpcClient.OperationResult> {
// calculateSha512() is a CPU intensive operation that should NOT be performed on the main
// calculateQueryId() is a CPU intensive operation that should NOT be performed on the main
// thread. This is the first reason why this method assumes it's running in this.coroutineScope.
val queryId = variables.calculateSha512(preamble = operationName)
val queryId = calculateQueryId(operationName, variables)

// Acquiring the lock by an arbitrary thread could result in priority inversion. This is the
// second reason why this method assumes it's running in this.coroutineScope: control over the
// thread that acquires the lock.
mutex.withLock {
return flowByQueryId.getOrPut(queryId) {
val executeResponseFlow = stream.subscribe(requestId, operationName, variables)

executeResponseFlow.map { executeResponse ->
DataConnectGrpcClient.OperationResult(
data = executeResponse.data,
errors = executeResponse.errors,
source = DataSource.SERVER,
)
}
stream
.subscribe(requestId, operationName, variables)
.updateCache(cache, queryId)
Comment thread
dconeybe marked this conversation as resolved.
.mapToOperationResponse()
}
}
}
Expand Down Expand Up @@ -212,8 +213,7 @@ internal class RealtimeQueryManager(

class Connected(val stream: DataConnectBidiConnectStream) : State {
val mutex = Mutex()
val flowByQueryId:
MutableMap<ImmutableByteArray, Flow<DataConnectGrpcClient.OperationResult>> =
val flowByQueryId: MutableMap<QueryId, Flow<DataConnectGrpcClient.OperationResult>> =
mutableMapOf()
override fun toString() = "Connected"
}
Expand Down Expand Up @@ -242,3 +242,37 @@ internal suspend fun <Data, Variables> RealtimeQueryManager.subscribe(
queryRef.dataSerializersModule,
queryRef.variablesSerializersModule,
)

private fun Flow<DataConnectBidiConnectStream.ExecuteResponse>.mapToOperationResponse():
Flow<DataConnectGrpcClient.OperationResult> = map { executeResponse ->
DataConnectGrpcClient.OperationResult(
data = executeResponse.data,
errors = executeResponse.errors,
source = DataSource.SERVER,
)
}

private fun Flow<DataConnectBidiConnectStream.ExecuteResponse>.updateCache(
cache: DataConnectCache?,
queryId: QueryId
): Flow<DataConnectBidiConnectStream.ExecuteResponse> = onEach { response ->
val cacheDb = cache?.open() ?: return@onEach
val data = response.data ?: return@onEach // null indicates error
cacheDb.insertQueryResult(
response.authUid,
queryId,
data,
cache.maxAgeProto,
currentTimeMillis(),
response.getEntityIdForPathFunction(),
)
}
Comment thread
dconeybe marked this conversation as resolved.

@JvmName("getEntityIdForPathFunction_DataConnectBidiConnectStream_ExecuteResponse")
private fun DataConnectBidiConnectStream.ExecuteResponse.getEntityIdForPathFunction():
GetEntityIdForPathFunction? =
if (extensions.isEmpty()) {
null
} else {
extensions.getEntityIdForPathFunction()
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,41 +71,34 @@ import kotlinx.coroutines.withContext
*/
internal object GrpcBidiFlow {

/**
* Represents events emitted by the [Flow] created by [GrpcBidiFlow.create].
*
* @property connectionId The "connectionId" to uniquely identify a connection to the remote
* server, especially for correlation with invocations of [Listener.collectStarted].
*/
sealed class Event<in RequestT, out ResponseT, out ConnectionCookie>(val connectionId: String) {
/** Represents events emitted by the [Flow] created by [GrpcBidiFlow.create]. */
sealed class Event<in RequestT, out ResponseT, out ConnectionCookie>(
val connectionId: String,
val connectionCookie: ConnectionCookie,
) {
/**
* Emitted once when the gRPC flow collection starts.
*
* It provides a [SendChannel] that the caller can use to send requests to the server. Closing
* this channel will half-close the gRPC stream from the client side.
*
* @param connectionId The unique identifier associated with this particular flow collection.
* @property outgoingRequests The channel to send requests to the server.
*/
class ConnectionInfo<in RequestT, out ConnectionCookie>(
connectionId: String,
val connectionCookie: ConnectionCookie,
connectionCookie: ConnectionCookie,
val outgoingRequests: SendChannel<RequestT>,
) : Event<RequestT, Nothing, ConnectionCookie>(connectionId) {
) : Event<RequestT, Nothing, ConnectionCookie>(connectionId, connectionCookie) {
override fun toString() =
"ConnectionInfo(connectionId=$connectionId, connectionCookie=$connectionCookie)"
}

/**
* Emitted when a response message is received from the server.
*
* @property message The response message received from the server.
*/
class Message<out ResponseT>(
/** Emitted when a response message is received from the server. */
class Message<out ResponseT, out ConnectionCookie>(
connectionId: String,
connectionCookie: ConnectionCookie,
val message: ResponseT,
) : Event<Any?, ResponseT, Nothing>(connectionId) {
override fun toString() = "Message(message=$message)"
) : Event<Any?, ResponseT, ConnectionCookie>(connectionId, connectionCookie) {
override fun toString() =
"Message(connectionId=$connectionId, connectionCookie=$connectionCookie, message=$message)"
}
}

Expand Down Expand Up @@ -311,7 +304,7 @@ internal object GrpcBidiFlow {
clientCall.request(1)
for (response in responses) {
collectionListener?.receivedMessage(response)
emit(Event.Message(connectionId, response))
emit(Event.Message(connectionId, connectionCookie, response))
clientCall.request(1)
}
collectionListener?.receivingMessagesComplete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class RealtimeQueryManagerUnitTest {
coroutineScope = backgroundScope,
idStringGenerator = IdStringGenerator(rs.random),
serialization = DataConnectSerialization(StandardTestDispatcher(testScheduler)),
cache = null,
logger = newMockLogger("s78hgm6fff")
)

Expand Down Expand Up @@ -104,6 +105,7 @@ class RealtimeQueryManagerUnitTest {
coroutineScope = backgroundScope,
idStringGenerator = IdStringGenerator(rs.random),
serialization = DataConnectSerialization(StandardTestDispatcher(testScheduler)),
cache = null,
logger = newMockLogger("yzrpk2m6tt")
)

Expand Down
Loading