Skip to content

Commit 534ef3b

Browse files
authored
dataconnect(test): ConnectRPCIntegrationTest.kt added with its first test (#8097)
1 parent 5ed3b92 commit 534ef3b

10 files changed

Lines changed: 439 additions & 82 deletions

File tree

firebase-dataconnect/androidTestutil/src/main/kotlin/com/google/firebase/dataconnect/testutil/DataConnectIntegrationTestBase.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ abstract class DataConnectIntegrationTestBase {
3939

4040
@get:Rule val dataConnectFactory = TestDataConnectFactory(firebaseAppFactory)
4141

42+
@get:Rule val cleanups = CleanupsRule()
43+
4244
@get:Rule(order = Int.MIN_VALUE) val randomSeedTestRule = RandomSeedTestRule()
4345

4446
val rs: RandomSource by randomSeedTestRule.rs
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.firebase.dataconnect
18+
19+
import app.cash.turbine.test
20+
import com.google.firebase.dataconnect.testutil.DataConnectIntegrationTestBase
21+
import com.google.firebase.dataconnect.testutil.createGrpcManagedChannel
22+
import com.google.firebase.dataconnect.testutil.schemas.RealtimeConnector
23+
import com.google.firebase.dataconnect.testutil.schemas.RealtimeConnector.GetStringByKeyQuery
24+
import com.google.firebase.dataconnect.testutil.schemas.RealtimeConnector.InsertStringMutation
25+
import com.google.firebase.dataconnect.util.ProtoUtil.decodeFromStruct
26+
import com.google.firebase.dataconnect.util.ProtoUtil.encodeToStruct
27+
import google.firebase.dataconnect.proto.ConnectorStreamServiceGrpcKt.ConnectorStreamServiceCoroutineStub
28+
import google.firebase.dataconnect.proto.ExecuteRequest
29+
import google.firebase.dataconnect.proto.StreamRequest
30+
import google.firebase.dataconnect.proto.StreamResponse
31+
import io.kotest.assertions.assertSoftly
32+
import io.kotest.assertions.withClue
33+
import io.kotest.matchers.nulls.shouldNotBeNull
34+
import io.kotest.matchers.result.shouldBeSuccess
35+
import io.kotest.matchers.shouldBe
36+
import io.kotest.property.Arb
37+
import io.kotest.property.arbitrary.Codepoint
38+
import io.kotest.property.arbitrary.az
39+
import io.kotest.property.arbitrary.map
40+
import io.kotest.property.arbitrary.string
41+
import kotlinx.coroutines.channels.Channel
42+
import kotlinx.coroutines.channels.SendChannel
43+
import kotlinx.coroutines.flow.Flow
44+
import kotlinx.coroutines.flow.consumeAsFlow
45+
import kotlinx.coroutines.test.runTest
46+
import org.junit.Test
47+
48+
class ConnectRPCIntegrationTest : DataConnectIntegrationTestBase() {
49+
50+
@Test
51+
fun responseIsReceivedForExecuteQueryRequest() = runTest {
52+
val requestId = requestIdArb().sample()
53+
val name = nameArb().sample()
54+
val (connector, outgoingRequests, incomingResponses) = connect()
55+
val key = connector.insertString(name)
56+
outgoingRequests.sendInitRequest(requestId = "init", connector.resourceName)
57+
58+
incomingResponses.test {
59+
outgoingRequests.sendNonBlockingOrThrow(
60+
StreamRequest.newBuilder().let { streamRequest ->
61+
streamRequest.setRequestId(requestId)
62+
streamRequest.setExecute(
63+
ExecuteRequest.newBuilder().let { executeRequest ->
64+
executeRequest.setOperationName(GetStringByKeyQuery.OPERATION_NAME)
65+
executeRequest.setVariables(encodeToStruct(GetStringByKeyQuery.Variables(key)))
66+
executeRequest.build()
67+
}
68+
)
69+
streamRequest.build()
70+
}
71+
)
72+
73+
val streamResponse = awaitItem()
74+
75+
streamResponse.shouldBeGetStringByKeyDataWithName(requestId = requestId, name = name)
76+
}
77+
}
78+
79+
@Test
80+
fun responseIsReceivedForExecuteMutationRequest() = runTest {
81+
val requestId = requestIdArb().sample()
82+
val name = nameArb().sample()
83+
val (connector, outgoingRequests, incomingResponses) = connect()
84+
outgoingRequests.sendInitRequest(requestId = "init", connector.resourceName)
85+
86+
incomingResponses.test {
87+
outgoingRequests.sendNonBlockingOrThrow(
88+
StreamRequest.newBuilder().let { streamRequest ->
89+
streamRequest.setRequestId(requestId)
90+
streamRequest.setExecute(
91+
ExecuteRequest.newBuilder().let { executeRequest ->
92+
executeRequest.setOperationName(InsertStringMutation.OPERATION_NAME)
93+
executeRequest.setVariables(encodeToStruct(InsertStringMutation.Variables(name)))
94+
executeRequest.build()
95+
}
96+
)
97+
streamRequest.build()
98+
}
99+
)
100+
101+
val streamResponse = awaitItem()
102+
103+
val key = streamResponse.shouldBeInsertStringData(requestId = requestId)
104+
connector.getString(key).shouldNotBeNull().name shouldBe name
105+
}
106+
}
107+
108+
/**
109+
* Establishes a connection to the Data Connect `Connect` RPC and returns the streams for
110+
* communication.
111+
*/
112+
private fun connect(): ConnectionStreams {
113+
val connector = RealtimeConnector.getInstance(dataConnectFactory)
114+
val grpcManagedChannel = createGrpcManagedChannel(connector.dataConnect)
115+
val outgoingRequests = Channel<StreamRequest>(Channel.UNLIMITED)
116+
val stub = ConnectorStreamServiceCoroutineStub(grpcManagedChannel)
117+
val incomingResponses = stub.connect(outgoingRequests.consumeAsFlow())
118+
return ConnectionStreams(connector, outgoingRequests, incomingResponses)
119+
}
120+
121+
/**
122+
* Convenience extension function on [Arb] that gets a non-edge-case value using the [rs] property
123+
* of the [DataConnectIntegrationTestBase] superclass for the randomness source.
124+
*/
125+
private fun <T> Arb<T>.sample(): T = sample(rs).value
126+
}
127+
128+
/**
129+
* Creates and returns an [Arb] that generates strings that are suitable for, and recognizable as,
130+
* "request IDs" in [StreamRequest] messages.
131+
*/
132+
private fun requestIdArb(): Arb<String> = Arb.string(size = 4, Codepoint.az()).map { "rid$it" }
133+
134+
/**
135+
* Creates and returns an [Arb] that generates strings that are suitable for, and recognizable as,
136+
* "name" values for [RealtimeConnector.InsertStringMutation.Variables.name] and
137+
* [RealtimeConnector.UpdateStringMutation.Variables.name].
138+
*/
139+
private fun nameArb(): Arb<String> = Arb.string(size = 4, Codepoint.az()).map { "nam$it" }
140+
141+
/** Exception thrown by [sendNonBlockingOrThrow] if sending fails. */
142+
private class SendChannelTrySendUnsuccessfulException(message: String, cause: Throwable?) :
143+
Exception(message, cause)
144+
145+
/**
146+
* Sends an element to receiver [SendChannel] in a non-blocking way, throwing an exception if
147+
* sending was unsuccessful.
148+
*
149+
* @throws SendChannelTrySendUnsuccessfulException if [SendChannel.trySend] returns a failed result.
150+
*/
151+
private fun <T> SendChannel<T>.sendNonBlockingOrThrow(element: T) {
152+
val result = trySend(element)
153+
if (!result.isSuccess) {
154+
throw SendChannelTrySendUnsuccessfulException(
155+
"trySend() failed with isClosed=${result.isClosed}",
156+
result.exceptionOrNull()
157+
)
158+
}
159+
}
160+
161+
private data class ConnectionStreams(
162+
val connector: RealtimeConnector,
163+
val outgoingRequests: SendChannel<StreamRequest>,
164+
val incomingResponses: Flow<StreamResponse>,
165+
)
166+
167+
/**
168+
* Sends an initialization request to the receiver [ConnectionStreams.outgoingRequests].
169+
*
170+
* @param requestId The "request ID" to send in the outgoing message.
171+
* @param connectorResourceName The "connector resource name" to send in the outgoing message.
172+
*/
173+
private fun SendChannel<StreamRequest>.sendInitRequest(
174+
requestId: String,
175+
connectorResourceName: String
176+
) {
177+
val streamRequest =
178+
StreamRequest.newBuilder().let {
179+
it.setRequestId(requestId)
180+
it.setName(connectorResourceName)
181+
it.build()
182+
}
183+
sendNonBlockingOrThrow(streamRequest)
184+
}
185+
186+
/**
187+
* Assertion to verify that a [StreamResponse] matches the expected "RealtimeString_GetByKey" data.
188+
*
189+
* @param requestId The expected request ID.
190+
* @param name The expected name in the data.
191+
*/
192+
private fun StreamResponse.shouldBeGetStringByKeyDataWithName(requestId: String, name: String) {
193+
withClue("StreamResponse.shouldBeGetStringByKeyDataWithName") {
194+
assertSoftly {
195+
val data = shouldBeSuccess<GetStringByKeyQuery.Data>(requestId = requestId)
196+
withClue("data") { data.item.shouldNotBeNull().name shouldBe name }
197+
}
198+
}
199+
}
200+
201+
/**
202+
* Assertion to verify that a [StreamResponse] matches the expected "RealtimeString_Insert" data.
203+
*
204+
* @param requestId The expected request ID.
205+
*/
206+
private fun StreamResponse.shouldBeInsertStringData(requestId: String): RealtimeConnector.Key =
207+
withClue("StreamResponse.shouldBeInsertStringData") {
208+
assertSoftly { shouldBeSuccess<InsertStringMutation.Data>(requestId = requestId).key }
209+
}
210+
211+
/**
212+
* Assertion to verify that a [StreamResponse] matches the expected "RealtimeString_Insert" data.
213+
*
214+
* @param requestId The expected request ID.
215+
* @return the [Data] decoded from the receiver.
216+
*/
217+
private inline fun <reified Data> StreamResponse.shouldBeSuccess(requestId: String): Data {
218+
withClue("requestId") { this.requestId shouldBe requestId }
219+
withClue("errorsCount") { this.errorsCount shouldBe 0 }
220+
221+
val dataDecodeResult = runCatching { decodeFromStruct<Data>(this.data) }
222+
val data = withClue("decodeFromStruct(data)") { dataDecodeResult.shouldBeSuccess() }
223+
224+
return data
225+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.firebase.dataconnect.testutil
18+
19+
import com.google.firebase.dataconnect.FirebaseDataConnect
20+
import com.google.firebase.dataconnect.core.DataConnectGrpcRPCs
21+
import com.google.firebase.dataconnect.core.FirebaseDataConnectInternal
22+
import com.google.firebase.dataconnect.core.Logger
23+
import io.grpc.ManagedChannel
24+
import io.mockk.mockk
25+
import java.util.concurrent.TimeUnit
26+
import kotlinx.coroutines.Dispatchers
27+
import kotlinx.coroutines.asExecutor
28+
29+
/**
30+
* Information about the gRPC configuration used by this [FirebaseDataConnect] instance.
31+
*
32+
* This is primarily intended for use in tests to inspect or replicate the gRPC connection
33+
* parameters, such as the host and whether SSL is enabled.
34+
*/
35+
internal val FirebaseDataConnect.grpcTestInfo: DataConnectGrpcRPCs.TestInfo
36+
get() = (this as FirebaseDataConnectInternal).grpcRPCs.testInfo
37+
38+
/**
39+
* Creates a gRPC [ManagedChannel] that connects to the same remote host as the given
40+
* [FirebaseDataConnect] instance, and registers the [ManagedChannel] for automatic cleanup after
41+
* the test completes.
42+
*
43+
* This is useful for integration tests that need to interact with the gRPC layer directly, for
44+
* example to verify interceptors or low-level communication.
45+
*
46+
* @param dataConnect The [FirebaseDataConnect] instance whose gRPC configuration (host, SSL, etc.)
47+
* to use to configure the new [ManagedChannel].
48+
* @return A new [ManagedChannel] configured to connect to the same backend as [dataConnect].
49+
*/
50+
internal fun DataConnectIntegrationTestBase.createGrpcManagedChannel(
51+
dataConnect: FirebaseDataConnect
52+
): ManagedChannel {
53+
val (context, host, sslEnabled) = dataConnect.grpcTestInfo
54+
55+
val managedChannel =
56+
DataConnectGrpcRPCs.createGrpcManagedChannel(
57+
context,
58+
host,
59+
sslEnabled,
60+
Dispatchers.IO.asExecutor(),
61+
mockk<Logger>(name = "TestGrpcManagedChannel", relaxed = true),
62+
)
63+
64+
cleanups.register {
65+
managedChannel.shutdownNow()
66+
managedChannel.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
67+
}
68+
69+
return managedChannel
70+
}

firebase-dataconnect/src/androidTest/kotlin/com/google/firebase/dataconnect/testutil/schemas/RealtimeConnector.kt

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@ package com.google.firebase.dataconnect.testutil.schemas
1818

1919
import com.google.firebase.dataconnect.ConnectorConfig
2020
import com.google.firebase.dataconnect.FirebaseDataConnect
21+
import com.google.firebase.dataconnect.core.FirebaseDataConnectInternal
2122
import com.google.firebase.dataconnect.serializers.UUIDSerializer
2223
import com.google.firebase.dataconnect.testutil.TestDataConnectFactory
2324
import java.util.UUID
2425
import kotlinx.serialization.Serializable
2526
import kotlinx.serialization.serializer
2627

27-
class RealtimeConnector private constructor(val dataConnect: FirebaseDataConnect) {
28+
class RealtimeConnector private constructor(dataConnectInternal: FirebaseDataConnectInternal) {
29+
30+
val dataConnect: FirebaseDataConnect = dataConnectInternal
31+
32+
val resourceName: String = dataConnectInternal.connectorResourceName
2833

2934
val getStringByKey = GetStringByKeyQuery(this)
3035

@@ -34,19 +39,19 @@ class RealtimeConnector private constructor(val dataConnect: FirebaseDataConnect
3439

3540
val deleteString = DeleteStringMutation(this)
3641

37-
suspend fun getString(key: Key) = getStringByKey.execute(key)
42+
suspend fun getString(key: Key): GetStringByKeyQuery.Data.Item? = getStringByKey.execute(key)
3843

39-
suspend fun insertString(name: String) = insertString.execute(name)
44+
suspend fun insertString(name: String): Key = insertString.execute(name)
4045

41-
suspend fun updateString(key: Key, name: String) = updateString.execute(key, name)
46+
suspend fun updateString(key: Key, name: String): Unit = updateString.execute(key, name)
4247

43-
suspend fun deleteString(key: Key) = deleteString.execute(key)
48+
suspend fun deleteString(key: Key): Unit = deleteString.execute(key)
4449

4550
class GetStringByKeyQuery(val connector: RealtimeConnector) {
4651

47-
suspend fun execute(key: Key) = execute(Variables(key))
52+
suspend fun execute(key: Key): Data.Item? = execute(Variables(key))
4853

49-
suspend fun execute(variables: Variables) = queryRef(variables).execute().data.item
54+
suspend fun execute(variables: Variables): Data.Item? = queryRef(variables).execute().data.item
5055

5156
fun queryRef(variables: Variables) =
5257
connector.dataConnect.query(OPERATION_NAME, variables, serializer<Data>(), serializer())
@@ -67,9 +72,9 @@ class RealtimeConnector private constructor(val dataConnect: FirebaseDataConnect
6772
@Serializable data class Variables(val name: String)
6873
@Serializable data class Data(val key: Key)
6974

70-
suspend fun execute(name: String) = execute(Variables(name = name))
75+
suspend fun execute(name: String): Key = execute(Variables(name = name))
7176

72-
suspend fun execute(variables: Variables) = mutationRef(variables).execute().data.key
77+
suspend fun execute(variables: Variables): Key = mutationRef(variables).execute().data.key
7378

7479
fun mutationRef(variables: Variables) =
7580
connector.dataConnect.mutation(OPERATION_NAME, variables, serializer<Data>(), serializer())
@@ -82,7 +87,7 @@ class RealtimeConnector private constructor(val dataConnect: FirebaseDataConnect
8287
class UpdateStringMutation(val connector: RealtimeConnector) {
8388
@Serializable data class Variables(val key: Key, val name: String)
8489

85-
suspend fun execute(key: Key, name: String) = execute(Variables(key = key, name = name))
90+
suspend fun execute(key: Key, name: String): Unit = execute(Variables(key = key, name = name))
8691

8792
suspend fun execute(variables: Variables) {
8893
mutationRef(variables).execute()
@@ -99,7 +104,7 @@ class RealtimeConnector private constructor(val dataConnect: FirebaseDataConnect
99104
class DeleteStringMutation(val connector: RealtimeConnector) {
100105
@Serializable data class Variables(val key: Key)
101106

102-
suspend fun execute(key: Key) = execute(Variables(key = key))
107+
suspend fun execute(key: Key): Unit = execute(Variables(key = key))
103108

104109
suspend fun execute(variables: Variables) {
105110
mutationRef(variables).execute()
@@ -125,7 +130,7 @@ class RealtimeConnector private constructor(val dataConnect: FirebaseDataConnect
125130

126131
fun getInstance(dataConnectFactory: TestDataConnectFactory): RealtimeConnector {
127132
val dataConnect = dataConnectFactory.newInstance(config)
128-
return RealtimeConnector(dataConnect)
133+
return RealtimeConnector(dataConnect as FirebaseDataConnectInternal)
129134
}
130135
}
131136
}

0 commit comments

Comments
 (0)