1616
1717package com.google.firebase.dataconnect
1818
19+ import app.cash.turbine.Event
1920import app.cash.turbine.test
21+ import app.cash.turbine.turbineScope
2022import com.google.firebase.dataconnect.testutil.DataConnectIntegrationTestBase
2123import com.google.firebase.dataconnect.testutil.createGrpcManagedChannel
2224import com.google.firebase.dataconnect.testutil.property.arbitrary.pair
@@ -51,6 +53,7 @@ import io.kotest.property.ShrinkingMode
5153import io.kotest.property.arbitrary.Codepoint
5254import io.kotest.property.arbitrary.arbitrary
5355import io.kotest.property.arbitrary.az
56+ import io.kotest.property.arbitrary.constant
5457import io.kotest.property.arbitrary.distinct
5558import io.kotest.property.arbitrary.enum
5659import io.kotest.property.arbitrary.map
@@ -299,6 +302,68 @@ class ConnectRPCIntegrationTest : DataConnectIntegrationTestBase() {
299302 sendResult.isClosed shouldBe true
300303 }
301304
305+ @Test
306+ fun collectingFlowTwiceThrows () = runTest {
307+ val streams = connect()
308+ streams.sendInitRequest()
309+ val (streamRequest1, streamRequest2) = validStreamRequestArb().pair().sample()
310+ streams.outgoingRequests.send(streamRequest1)
311+
312+ turbineScope {
313+ val collectors = List (2 ) { streams.incomingResponses.testIn(backgroundScope) }
314+ val events = collectors.map { it.awaitEvent() }
315+
316+ withClue(" events=${events.print ().value} " ) {
317+ assertSoftly {
318+ withClue(" items" ) { events.count { it is Event .Item } shouldBe 1 }
319+ withClue(" errors" ) { events.count { it is Event .Error } shouldBe 1 }
320+ }
321+ }
322+
323+ // Make sure the "good" collector still works despite the other one failing.
324+ val (collector, streamResponse1) =
325+ collectors
326+ .zip(events)
327+ .mapNotNull { (collector, event) ->
328+ (event as ? Event .Item )?.let { Pair (collector, it.value) }
329+ }
330+ .single()
331+ streamResponse1.requestId shouldBe streamRequest1.requestId
332+ streams.outgoingRequests.send(streamRequest2)
333+ collector.awaitItem().requestId shouldBe streamRequest2.requestId
334+ }
335+ }
336+
337+ @Test
338+ fun registeringRequestIdTwiceThrows () = runTest {
339+ val streams = connect()
340+ streams.sendInitRequest()
341+ val requestId = requestIdArb().sample()
342+ val (streamRequest1, streamRequest2) =
343+ validSubscribeStreamRequestArb(
344+ requestId = Arb .constant(requestId),
345+ validExecuteQueryRequest =
346+ @OptIn(DelicateKotest ::class ) validExecuteQueryRequestArb().distinct(),
347+ )
348+ .pair()
349+ .sample()
350+
351+ check(streamRequest1.requestId == streamRequest2.requestId)
352+ check(streamRequest1.subscribe != streamRequest2.subscribe)
353+
354+ streams.incomingResponses.test {
355+ streams.outgoingRequests.send(streamRequest1)
356+ awaitItem().requestId shouldBe requestId
357+
358+ streams.outgoingRequests.send(streamRequest2)
359+ val exception = awaitError()
360+
361+ val statusException = exception.shouldBeInstanceOf<StatusException >()
362+ statusException.status.code shouldBe Status .Code .FAILED_PRECONDITION
363+ statusException.message shouldContainWithNonAbuttingTextIgnoringCase " request_id"
364+ }
365+ }
366+
302367 @Test
303368 fun testValidStreamRequestArb () = runTest {
304369 val streams = connect()
@@ -314,6 +379,30 @@ class ConnectRPCIntegrationTest : DataConnectIntegrationTestBase() {
314379 }
315380 }
316381
382+ @Test
383+ fun testValidSubscribeStreamRequestArb () = runTest {
384+ val streams = connect()
385+ streams.sendInitRequest()
386+
387+ // Use distinct UUIDs in the "subscribe" requests because the backend appears to do some weird
388+ // de-duping if the same query is subscribed to with distinct request IDs.
389+ val subscribeStreamRequestArb =
390+ validSubscribeStreamRequestArb(
391+ validExecuteQueryRequest =
392+ validExecuteQueryRequestArb(keyArb(@OptIn(DelicateKotest ::class ) Arb .uuid().distinct()))
393+ )
394+
395+ streams.incomingResponses.test {
396+ checkAll(propTestConfig, subscribeStreamRequestArb) { streamRequest ->
397+ check(streamRequest.hasSubscribe())
398+ streams.outgoingRequests.send(streamRequest)
399+ val streamResponse = awaitItem()
400+ streamResponse.requestId shouldBe streamRequest.requestId
401+ streamResponse.errorsCount shouldBe 0
402+ }
403+ }
404+ }
405+
317406 private suspend fun verifyExecuteRequestReturnsCorrectRequestIdAndCancelledTrue (
318407 streams : ConnectionStreams ,
319408 expectedErrorsCount : Int ,
@@ -350,7 +439,7 @@ class ConnectRPCIntegrationTest : DataConnectIntegrationTestBase() {
350439 private fun connect (): ConnectionStreams {
351440 val connector = RealtimeConnector .getInstance(dataConnectFactory)
352441 val grpcManagedChannel = createGrpcManagedChannel(connector.dataConnect)
353- val outgoingRequests = Channel <StreamRequest >(Channel . UNLIMITED )
442+ val outgoingRequests = Channel <StreamRequest >(UNLIMITED )
354443 val stub = ConnectorStreamServiceCoroutineStub (grpcManagedChannel)
355444 val incomingResponses = stub.connect(outgoingRequests.consumeAsFlow())
356445 return ConnectionStreams (connector, outgoingRequests, incomingResponses)
@@ -457,6 +546,7 @@ private inline fun <reified Data> StreamResponse.shouldHaveData(
457546}
458547
459548private enum class ValidStreamRequestType {
549+ SubscribeQuery ,
460550 ExecuteQuery ,
461551 ExecuteMutation ,
462552}
@@ -467,31 +557,66 @@ private enum class ValidStreamRequestType {
467557 */
468558private fun validStreamRequestArb (
469559 requestId : Arb <String > = @OptIn(DelicateKotest ::class ) requestIdArb().distinct(),
470- name : Arb <String > = nameArb (),
471- key : Arb <RealtimeConnector . Key > = keyArb (),
560+ validExecuteQueryRequest : Arb <ExecuteRequest > = validExecuteQueryRequestArb (),
561+ validExecuteMutationRequest : Arb <ExecuteRequest > = validExecuteMutationRequestArb (),
472562 validStreamRequestType : Arb <ValidStreamRequestType > = Arb .enum<ValidStreamRequestType >(),
473563): Arb <StreamRequest > = arbitrary {
474564 val streamRequest = StreamRequest .newBuilder()
475565 streamRequest.setRequestId(requestId.bind())
476566
477567 when (validStreamRequestType.bind()) {
478- ValidStreamRequestType .ExecuteQuery ->
479- streamRequest.setExecute(
480- ExecuteRequest .newBuilder().let { executeRequest ->
481- executeRequest.setOperationName(GetStringByKeyQuery .OPERATION_NAME )
482- executeRequest.setVariables(encodeToStruct(GetStringByKeyQuery .Variables (key.bind())))
483- executeRequest.build()
484- }
485- )
568+ ValidStreamRequestType .SubscribeQuery ->
569+ streamRequest.setSubscribe(validExecuteQueryRequest.bind())
570+ ValidStreamRequestType .ExecuteQuery -> streamRequest.setExecute(validExecuteQueryRequest.bind())
486571 ValidStreamRequestType .ExecuteMutation ->
487- streamRequest.setExecute(
488- ExecuteRequest .newBuilder().let { executeRequest ->
489- executeRequest.setOperationName(InsertStringMutation .OPERATION_NAME )
490- executeRequest.setVariables(encodeToStruct(InsertStringMutation .Variables (name.bind())))
491- executeRequest.build()
492- }
493- )
572+ streamRequest.setExecute(validExecuteMutationRequest.bind())
494573 }
495574
496575 streamRequest.build()
497576}
577+
578+ /* *
579+ * Creates and returns an [Arb] that generates [ExecuteRequest] objects that, when sent as the
580+ * [StreamRequest.setExecute] or [StreamRequest.setSubscribe] member of a [StreamRequest] to the
581+ * [ConnectionStreams.outgoingRequests] after the "init" request, should successfully run a query.
582+ */
583+ private fun validExecuteQueryRequestArb (
584+ key : Arb <RealtimeConnector .Key > = keyArb(),
585+ ): Arb <ExecuteRequest > = arbitrary {
586+ ExecuteRequest .newBuilder().let { executeRequest ->
587+ executeRequest.setOperationName(GetStringByKeyQuery .OPERATION_NAME )
588+ executeRequest.setVariables(encodeToStruct(GetStringByKeyQuery .Variables (key.bind())))
589+ executeRequest.build()
590+ }
591+ }
592+
593+ /* *
594+ * Creates and returns an [Arb] that generates [ExecuteRequest] objects that, when sent as the
595+ * [StreamRequest.setExecute] member of a [StreamRequest] to the
596+ * [ConnectionStreams.outgoingRequests] after the "init" request, should successfully run a
597+ * mutation.
598+ */
599+ private fun validExecuteMutationRequestArb (
600+ name : Arb <String > = nameArb(),
601+ ): Arb <ExecuteRequest > = arbitrary {
602+ ExecuteRequest .newBuilder().let { executeRequest ->
603+ executeRequest.setOperationName(InsertStringMutation .OPERATION_NAME )
604+ executeRequest.setVariables(encodeToStruct(InsertStringMutation .Variables (name.bind())))
605+ executeRequest.build()
606+ }
607+ }
608+
609+ /* *
610+ * Creates and returns an [Arb] that generates [StreamRequest] objects that, when sent to the
611+ * [ConnectionStreams.outgoingRequests] after the "init" request, should successfully register a
612+ * query subscription.
613+ */
614+ private fun validSubscribeStreamRequestArb (
615+ requestId : Arb <String > = @OptIn(DelicateKotest ::class ) requestIdArb().distinct(),
616+ validExecuteQueryRequest : Arb <ExecuteRequest > = validExecuteQueryRequestArb(),
617+ ): Arb <StreamRequest > = arbitrary {
618+ val streamRequest = StreamRequest .newBuilder()
619+ streamRequest.setRequestId(requestId.bind())
620+ streamRequest.setSubscribe(validExecuteQueryRequest.bind())
621+ streamRequest.build()
622+ }
0 commit comments