Skip to content

Commit c97fcf5

Browse files
authored
dataconnect(chore): wire up RealtimeQuerySubscriptionImpl to basic realtime subscription functionality (#8144)
1 parent f9817b0 commit c97fcf5

12 files changed

Lines changed: 620 additions & 3 deletions

File tree

firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/DataConnectGrpcRPCsConnectIntegrationTest.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
* limitations under the License.
1515
*/
1616

17+
@file:OptIn(ExperimentalRealtimeQueries::class)
18+
1719
package com.google.firebase.dataconnect
1820

1921
import app.cash.turbine.test
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
@file:OptIn(ExperimentalRealtimeQueries::class)
18+
19+
package com.google.firebase.dataconnect
20+
21+
import app.cash.turbine.test
22+
import com.google.firebase.dataconnect.testutil.DataConnectIntegrationTestBase
23+
import com.google.firebase.dataconnect.testutil.property.arbitrary.pair
24+
import com.google.firebase.dataconnect.testutil.realtimeQuery
25+
import com.google.firebase.dataconnect.testutil.registerDataConnectKotestPrinters
26+
import com.google.firebase.dataconnect.testutil.schemas.RealtimeConnector
27+
import com.google.firebase.dataconnect.testutil.schemas.RealtimeConnector.GetStringByKeyQuery
28+
import io.kotest.matchers.shouldBe
29+
import io.kotest.matchers.types.shouldBeSameInstanceAs
30+
import io.kotest.property.Arb
31+
import io.kotest.property.arbitrary.Codepoint
32+
import io.kotest.property.arbitrary.az
33+
import io.kotest.property.arbitrary.map
34+
import io.kotest.property.arbitrary.string
35+
import kotlinx.coroutines.test.runTest
36+
import kotlinx.serialization.serializer
37+
import org.junit.Before
38+
import org.junit.Test
39+
40+
/**
41+
* Integration tests for the
42+
* [com.google.firebase.dataconnect.core.FirebaseDataConnectInternal.realtimeQuery] method and its
43+
* return value.
44+
*/
45+
class RealtimeQueryRefIntegrationTest : DataConnectIntegrationTestBase() {
46+
47+
private val nameArb = Arb.string(size = 4, Codepoint.az()).map { "name_$it" }
48+
49+
@Before
50+
fun registerPrinters() {
51+
registerDataConnectKotestPrinters()
52+
}
53+
54+
@Test
55+
fun emittedResultsHaveTheCorrectQueryProperty() = runTest {
56+
val dataConnect = dataConnectFactory.newInstance(RealtimeConnector.config)
57+
val connector = RealtimeConnector.getInstance(dataConnect)
58+
val (name1, name2) = nameArb.pair().sample()
59+
val key = connector.insertString(name1)
60+
61+
val ref =
62+
dataConnect.realtimeQuery(
63+
GetStringByKeyQuery.OPERATION_NAME,
64+
GetStringByKeyQuery.Variables(key),
65+
serializer<GetStringByKeyQuery.Data>(),
66+
serializer(),
67+
)
68+
69+
ref.subscribe().flow.test {
70+
awaitItem().query shouldBeSameInstanceAs ref
71+
connector.updateString(key, name2)
72+
awaitItem().query shouldBeSameInstanceAs ref
73+
connector.deleteString(key)
74+
awaitItem().query shouldBeSameInstanceAs ref
75+
}
76+
}
77+
78+
@Test
79+
fun emittedResultsHaveResultsWithTheCorrectQuery() = runTest {
80+
val dataConnect = dataConnectFactory.newInstance(RealtimeConnector.config)
81+
val connector = RealtimeConnector.getInstance(dataConnect)
82+
val (name1, name2) = nameArb.pair().sample()
83+
val key = connector.insertString(name1)
84+
85+
val ref =
86+
dataConnect.realtimeQuery(
87+
GetStringByKeyQuery.OPERATION_NAME,
88+
GetStringByKeyQuery.Variables(key),
89+
serializer<GetStringByKeyQuery.Data>(),
90+
serializer(),
91+
)
92+
93+
ref.subscribe().flow.test {
94+
awaitItem().result.getOrThrow().ref shouldBeSameInstanceAs ref
95+
connector.updateString(key, name2)
96+
awaitItem().result.getOrThrow().ref shouldBeSameInstanceAs ref
97+
connector.deleteString(key)
98+
awaitItem().result.getOrThrow().ref shouldBeSameInstanceAs ref
99+
}
100+
}
101+
102+
@Test
103+
fun emittedResultsHaveResultsWithTheCorrectData() = runTest {
104+
val dataConnect = dataConnectFactory.newInstance(RealtimeConnector.config)
105+
val connector = RealtimeConnector.getInstance(dataConnect)
106+
val (name1, name2) = nameArb.pair().sample()
107+
val key = connector.insertString(name1)
108+
109+
val ref =
110+
dataConnect.realtimeQuery(
111+
GetStringByKeyQuery.OPERATION_NAME,
112+
GetStringByKeyQuery.Variables(key),
113+
serializer<GetStringByKeyQuery.Data>(),
114+
serializer(),
115+
)
116+
117+
ref.subscribe().flow.test {
118+
awaitItem().result.getOrThrow().data shouldBe GetStringByKeyQuery.Data(name1)
119+
connector.updateString(key, name2)
120+
awaitItem().result.getOrThrow().data shouldBe GetStringByKeyQuery.Data(name2)
121+
connector.deleteString(key)
122+
awaitItem().result.getOrThrow().data shouldBe GetStringByKeyQuery.Data(null)
123+
}
124+
}
125+
126+
@Test
127+
fun emittedResultsHaveResultsWithTheCorrectDataSource() = runTest {
128+
val dataConnect = dataConnectFactory.newInstance(RealtimeConnector.config)
129+
val connector = RealtimeConnector.getInstance(dataConnect)
130+
val (name1, name2) = nameArb.pair().sample()
131+
val key = connector.insertString(name1)
132+
133+
val ref =
134+
dataConnect.realtimeQuery(
135+
GetStringByKeyQuery.OPERATION_NAME,
136+
GetStringByKeyQuery.Variables(key),
137+
serializer<GetStringByKeyQuery.Data>(),
138+
serializer(),
139+
)
140+
141+
ref.subscribe().flow.test {
142+
awaitItem().result.getOrThrow().dataSource shouldBe DataSource.SERVER
143+
connector.updateString(key, name2)
144+
awaitItem().result.getOrThrow().dataSource shouldBe DataSource.SERVER
145+
connector.deleteString(key)
146+
awaitItem().result.getOrThrow().dataSource shouldBe DataSource.SERVER
147+
}
148+
}
149+
}

firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/FirebaseDataConnectInternalExts.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616

1717
package com.google.firebase.dataconnect.testutil
1818

19+
import com.google.firebase.dataconnect.ExperimentalRealtimeQueries
1920
import com.google.firebase.dataconnect.FirebaseDataConnect
21+
import com.google.firebase.dataconnect.QueryRef
2022
import com.google.firebase.dataconnect.core.FirebaseDataConnectInternal
23+
import kotlinx.serialization.DeserializationStrategy
24+
import kotlinx.serialization.SerializationStrategy
2125

2226
suspend fun FirebaseDataConnect.awaitAuthReady() =
2327
(this as FirebaseDataConnectInternal).awaitAuthReady()
@@ -27,3 +31,19 @@ suspend fun FirebaseDataConnect.awaitAppCheckReady() =
2731

2832
internal val FirebaseDataConnect.dataConnectGrpcRPCs
2933
get() = (this as FirebaseDataConnectInternal).grpcRPCs
34+
35+
@ExperimentalRealtimeQueries
36+
internal fun <Data, Variables> FirebaseDataConnect.realtimeQuery(
37+
operationName: String,
38+
variables: Variables,
39+
dataDeserializer: DeserializationStrategy<Data>,
40+
variablesSerializer: SerializationStrategy<Variables>,
41+
optionsBuilder: (FirebaseDataConnect.QueryRefOptionsBuilder<Data, Variables>.() -> Unit)? = null,
42+
): QueryRef<Data, Variables> =
43+
(this as FirebaseDataConnectInternal).realtimeQuery(
44+
operationName,
45+
variables,
46+
dataDeserializer,
47+
variablesSerializer,
48+
optionsBuilder,
49+
)

firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/schemas/RealtimeConnector.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import com.google.firebase.dataconnect.core.FirebaseDataConnectInternal
2323
import com.google.firebase.dataconnect.serializers.UUIDSerializer
2424
import com.google.firebase.dataconnect.testutil.DataConnectBackend
2525
import com.google.firebase.dataconnect.testutil.TestDataConnectFactory
26+
import io.kotest.assertions.print.print
2627
import java.util.UUID
2728
import kotlinx.serialization.Serializable
2829
import kotlinx.serialization.serializer
@@ -68,6 +69,7 @@ class RealtimeConnector private constructor(dataConnectInternal: FirebaseDataCon
6869

6970
@Serializable
7071
data class Data(val item: Item?) {
72+
constructor(name: String) : this(Item(name))
7173
@Serializable data class Item(val name: String)
7274
}
7375

@@ -143,5 +145,15 @@ class RealtimeConnector private constructor(dataConnectInternal: FirebaseDataCon
143145
val dataConnect = dataConnectFactory.newInstance(config, backend)
144146
return RealtimeConnector(dataConnect as FirebaseDataConnectInternal)
145147
}
148+
149+
fun getInstance(dataConnect: FirebaseDataConnect): RealtimeConnector {
150+
require(dataConnect.config == config) {
151+
"The given FirebaseDataConnect has a config that " +
152+
"does not match the config required for RealtimeConnector: " +
153+
"actual=${dataConnect.config.print().value}, " +
154+
"expected=${config.print().value}"
155+
}
156+
return RealtimeConnector(dataConnect as FirebaseDataConnectInternal)
157+
}
146158
}
147159
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.firebase.dataconnect
18+
19+
/**
20+
* Marks declarations in the Firebase Data Connect SDK that are part of the upcoming "realtime query
21+
* subscription updates" feature.
22+
*
23+
* During development of the "realtime query subscription updates" feature, the logic for it will be
24+
* included in the SDK but will only be visible via non-public methods. Once the feature is complete
25+
* and tested, the realtime logic will be merged into the existing [QueryRef] and
26+
* [QuerySubscription] classes, and this annotation and everything annotated by it will be deleted
27+
* or promoted to a bona fide part of the SDK.
28+
*/
29+
@MustBeDocumented
30+
@Retention(value = AnnotationRetention.BINARY)
31+
@RequiresOptIn(
32+
level = RequiresOptIn.Level.WARNING,
33+
message =
34+
"This declaration is part of the experimental, not-yet-released " +
35+
"\"realtime query subscription updates\" feature"
36+
)
37+
internal annotation class ExperimentalRealtimeQueries

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.firebase.dataconnect.core
1818

19+
import com.google.firebase.dataconnect.ExperimentalRealtimeQueries
1920
import com.google.firebase.dataconnect.core.LoggerGlobals.debug
2021
import com.google.firebase.dataconnect.util.CoroutineUtils
2122
import com.google.firebase.dataconnect.util.NullableReference
@@ -63,6 +64,7 @@ import kotlinx.coroutines.job
6364
* lifecycle.
6465
* @param logger The [Logger] used for debug and error logging.
6566
*/
67+
@ExperimentalRealtimeQueries
6668
internal class DataConnectBidiConnectStream(
6769
outgoingRequests: SendChannel<StreamRequestProto>,
6870
incomingResponses: Flow<StreamResponseProto>,
@@ -194,8 +196,17 @@ internal class DataConnectBidiConnectStream(
194196
.transformWhile { incomingResponse ->
195197
when (incomingResponse) {
196198
is IncomingResponse.Subscribed -> {
197-
outgoingRequests.send(streamRequest)
198-
true
199+
val sendResult = outgoingRequests.trySend(streamRequest)
200+
when {
201+
sendResult.isSuccess -> true
202+
sendResult.isClosed -> false
203+
else ->
204+
error(
205+
"internal error xw3zdzycfq: outgoingRequests.trySend(streamRequest) " +
206+
"was unable to enqueue the streamRequest; this should never happen because " +
207+
"outgoingRequests is created with capacity=UNLIMITED (sendResult=$sendResult)"
208+
)
209+
}
199210
}
200211
is IncomingResponse.Message -> {
201212
if (incomingResponse.streamResponse.requestId != requestId) {

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcClient.kt

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.firebase.dataconnect.core
1818

1919
import com.google.firebase.dataconnect.DataSource
20+
import com.google.firebase.dataconnect.ExperimentalRealtimeQueries
2021
import com.google.firebase.dataconnect.FirebaseDataConnect
2122
import com.google.firebase.dataconnect.QueryRef.FetchPolicy
2223
import com.google.firebase.dataconnect.core.DataConnectAppCheck.GetAppCheckTokenResult
@@ -26,6 +27,10 @@ import com.google.protobuf.Struct
2627
import google.firebase.dataconnect.proto.GraphqlError
2728
import io.grpc.Status
2829
import io.grpc.StatusException
30+
import kotlinx.coroutines.NonCancellable
31+
import kotlinx.coroutines.flow.Flow
32+
import kotlinx.coroutines.flow.flow
33+
import kotlinx.coroutines.withContext
2934

3035
internal class DataConnectGrpcClient(
3136
private val grpcRPCs: DataConnectGrpcRPCs,
@@ -93,6 +98,45 @@ internal class DataConnectGrpcClient(
9398
)
9499
}
95100

101+
@ExperimentalRealtimeQueries
102+
fun connect(
103+
requestId: String,
104+
operationName: String,
105+
variables: Struct,
106+
callerSdkType: FirebaseDataConnect.CallerSdkType,
107+
): Flow<OperationResult> = flow {
108+
val connection =
109+
grpcRPCs.retryOnGrpcUnauthenticatedError(requestId, "connect") { authToken, appCheckToken ->
110+
connect(
111+
requestId,
112+
callerSdkType,
113+
authToken,
114+
appCheckToken,
115+
)
116+
}
117+
118+
try {
119+
val flow =
120+
connection.subscribe(
121+
requestId = requestId,
122+
operationName = operationName,
123+
variables = variables,
124+
)
125+
126+
flow.collect { executeResponse: DataConnectBidiConnectStream.ExecuteResponse ->
127+
emit(
128+
OperationResult(
129+
data = executeResponse.data,
130+
errors = executeResponse.errors,
131+
source = DataSource.SERVER,
132+
)
133+
)
134+
}
135+
} finally {
136+
withContext(NonCancellable) { connection.close() }
137+
}
138+
}
139+
96140
private suspend inline fun <T, R> T.retryOnGrpcUnauthenticatedError(
97141
requestId: String,
98142
kotlinMethodName: String,

firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectGrpcRPCs.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import com.google.android.gms.security.ProviderInstaller
2222
import com.google.firebase.dataconnect.CachedDataNotFoundException
2323
import com.google.firebase.dataconnect.DataConnectPath
2424
import com.google.firebase.dataconnect.DataConnectPathSegment
25+
import com.google.firebase.dataconnect.ExperimentalRealtimeQueries
2526
import com.google.firebase.dataconnect.FirebaseDataConnect
2627
import com.google.firebase.dataconnect.QueryRef.FetchPolicy
2728
import com.google.firebase.dataconnect.core.DataConnectGrpcMetadata.Companion.toStructProto
@@ -385,6 +386,7 @@ internal class DataConnectGrpcRPCs(
385386
return cachedData?.let(ExecuteQueryResult::FromCache)
386387
}
387388

389+
@ExperimentalRealtimeQueries
388390
suspend fun connect(
389391
streamId: String,
390392
callerSdkType: FirebaseDataConnect.CallerSdkType,

0 commit comments

Comments
 (0)