diff --git a/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQL.kt b/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQL.kt index aebbf275f7..aeba1fc79d 100644 --- a/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQL.kt +++ b/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQL.kt @@ -169,7 +169,8 @@ class GraphQL(config: GraphQLConfiguration) { subscriptionHooks = config.server.subscriptions.hooks, requestHandler = requestHandler, initTimeoutMillis = config.server.subscriptions.connectionInitTimeout, - objectMapper = jacksonMapperBuilder().apply(config.server.jacksonConfiguration).build() + objectMapper = jacksonMapperBuilder().apply(config.server.jacksonConfiguration).build(), + subscriptionConcurrency = config.server.subscriptions.subscriptionConcurrency ) } diff --git a/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQLConfiguration.kt b/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQLConfiguration.kt index c0a4d0fb57..56e44e46db 100644 --- a/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQLConfiguration.kt +++ b/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQLConfiguration.kt @@ -26,6 +26,7 @@ import com.expediagroup.graphql.generator.hooks.FlowSubscriptionSchemaGeneratorH import com.expediagroup.graphql.generator.hooks.SchemaGeneratorHooks import com.expediagroup.graphql.generator.scalars.IDValueUnboxer import com.expediagroup.graphql.server.Schema +import com.expediagroup.graphql.server.execution.subscription.DEFAULT_WS_SUBSCRIPTION_CONCURRENCY import com.expediagroup.graphql.server.ktor.subscriptions.DefaultKtorGraphQLSubscriptionContextFactory import com.expediagroup.graphql.server.ktor.subscriptions.DefaultKtorGraphQLSubscriptionHooks import com.expediagroup.graphql.server.ktor.subscriptions.DefaultKtorGraphQLSubscriptionRequestParser @@ -284,6 +285,14 @@ class GraphQLConfiguration(config: ApplicationConfig) { var hooks: KtorGraphQLSubscriptionHooks = DefaultKtorGraphQLSubscriptionHooks() /** Server timeout between establishing web socket connection and receiving connection-init message */ var connectionInitTimeout: Long = config.tryGetString("graphql.server.subscription.connectionInitTimeout")?.toLongOrNull() ?: 60_000 + /** + * Maximum number of inbound client messages processed concurrently per web socket session. Defaults to + * [DEFAULT_WS_SUBSCRIPTION_CONCURRENCY] (16). Raise this when a single session may hold more than the default + * number of simultaneous subscriptions, otherwise additional messages (including ping/complete/subscribe) + * are back-pressured until one of the in-flight messages completes. + */ + var subscriptionConcurrency: Int = + config.tryGetString("graphql.server.subscription.concurrency")?.toIntOrNull() ?: DEFAULT_WS_SUBSCRIPTION_CONCURRENCY } } diff --git a/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/subscriptions/KtorGraphQLWebSocketServer.kt b/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/subscriptions/KtorGraphQLWebSocketServer.kt index dd759b73c1..2b8e8ed0ee 100644 --- a/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/subscriptions/KtorGraphQLWebSocketServer.kt +++ b/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/subscriptions/KtorGraphQLWebSocketServer.kt @@ -17,6 +17,7 @@ package com.expediagroup.graphql.server.ktor.subscriptions import com.expediagroup.graphql.server.execution.GraphQLRequestHandler +import com.expediagroup.graphql.server.execution.subscription.DEFAULT_WS_SUBSCRIPTION_CONCURRENCY import com.expediagroup.graphql.server.execution.subscription.GraphQLWebSocketServer import com.expediagroup.graphql.server.types.GraphQLSubscriptionStatus import io.ktor.server.websocket.WebSocketServerSession @@ -36,9 +37,10 @@ class KtorGraphQLWebSocketServer( subscriptionHooks: KtorGraphQLSubscriptionHooks, requestHandler: GraphQLRequestHandler, initTimeoutMillis: Long, - objectMapper: ObjectMapper + objectMapper: ObjectMapper, + subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY ) : GraphQLWebSocketServer( - requestParser, contextFactory, subscriptionHooks, requestHandler, initTimeoutMillis, objectMapper + requestParser, contextFactory, subscriptionHooks, requestHandler, initTimeoutMillis, objectMapper, subscriptionConcurrency ) { override suspend fun closeSession(session: WebSocketServerSession, reason: GraphQLSubscriptionStatus) { session.close(CloseReason(reason.code.toShort(), reason.reason)) diff --git a/servers/graphql-kotlin-server/src/main/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServer.kt b/servers/graphql-kotlin-server/src/main/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServer.kt index c49b064dcd..47e1a43345 100644 --- a/servers/graphql-kotlin-server/src/main/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServer.kt +++ b/servers/graphql-kotlin-server/src/main/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServer.kt @@ -57,7 +57,21 @@ import kotlin.coroutines.EmptyCoroutineContext const val GRAPHQL_WS_PROTOCOL = "graphql-transport-ws" /** - * GraphQL Web Socket server implementation for handling subscriptions using *graphql-transport-ws* protocol + * Default maximum number of in-flight inbound messages processed concurrently by a single web socket session. + * + * Matches the historical default of `kotlinx.coroutines.flow.flatMapMerge` (16) that was used before this value + * was made configurable. Raising the limit lets a single session process more simultaneous subscriptions at the + * cost of higher peak memory; `Int.MAX_VALUE` effectively removes the limit; `1` serializes message processing. + */ +const val DEFAULT_WS_SUBSCRIPTION_CONCURRENCY: Int = 16 + +/** + * GraphQL Web Socket server implementation for handling subscriptions using *graphql-transport-ws* protocol. + * + * @param subscriptionConcurrency maximum number of inbound client messages processed concurrently by the + * `flatMapMerge` operator that drives [handleSubscription]. At the default ([DEFAULT_WS_SUBSCRIPTION_CONCURRENCY]) + * in-flight subscriptions can back-pressure sibling protocol messages (ping, complete, new subscribe) once the + * ceiling is hit; see issue #2018. Pass a larger value, or `Int.MAX_VALUE`, to avoid this. * * @see graphql-transport-ws protocol */ @@ -67,7 +81,8 @@ abstract class GraphQLWebSocketServer( private val subscriptionHooks: GraphQLSubscriptionHooks, private val requestHandler: GraphQLRequestHandler, private val initTimeoutMillis: Long, - private val objectMapper: ObjectMapper = jacksonObjectMapper() + private val objectMapper: ObjectMapper = jacksonObjectMapper(), + private val subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY ) { private val logger: Logger = LoggerFactory.getLogger(GraphQLWebSocketServer::class.java) private val subscriptionScope = CoroutineScope(SupervisorJob()) @@ -86,7 +101,7 @@ abstract class GraphQLWebSocketServer( requestParser.parseRequestFlow(session) .map { objectMapper.readValue(it) } - .flatMapMerge { message -> + .flatMapMerge(concurrency = subscriptionConcurrency) { message -> channelFlow { when (message) { is SubscriptionMessageConnectionInit -> { diff --git a/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServerTest.kt b/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServerTest.kt index c71c5ee445..1128a93e61 100644 --- a/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServerTest.kt +++ b/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServerTest.kt @@ -300,6 +300,60 @@ class GraphQLWebSocketServerTest { subscriptionJob.cancelAndJoin() } + @Test + fun `verify subscription flow honors configured concurrency`() = runTest { + // concurrency=1 serializes inbound message processing: each subscribe's channelFlow must fully complete + // (counter 1,2,3 + complete) before the next subscribe starts flowing. With the historical default (16) + // the two subscriptions would interleave. See issue #2018. + val handler = GraphQLRequestHandler(graphQL = testGraphQLEngine()) + val testServer = InMemoryGraphQLSubscriptionServer( + requestHandler = handler, + subscriptionConcurrency = 1 + ) + + val session = Channel(Channel.BUFFERED) + val responseChannel = testServer.outboundChannel + + val subscriptionJob = launch { + testServer.handleSubscription(session) + .collect() + } + + session.send(mapper.writeValueAsString(SubscriptionMessageConnectionInit())) + val ack: GraphQLSubscriptionMessage = mapper.readValue(responseChannel.receive()) + assertEquals(GRAPHQL_WS_CONNECTION_ACK, ack.type) + + val firstId = UUID.randomUUID().toString() + val secondId = UUID.randomUUID().toString() + val request = GraphQLRequest(query = "subscription { counter }") + session.send(mapper.writeValueAsString(SubscriptionMessageSubscribe(id = firstId, payload = request))) + session.send(mapper.writeValueAsString(SubscriptionMessageSubscribe(id = secondId, payload = request))) + + // With concurrency=1 the second subscribe is held behind the first, so every response for firstId + // (3 next + 1 complete) must arrive before any response for secondId. + val firstResponseIds = (1..4).map { + val msg: GraphQLSubscriptionMessage = mapper.readValue(responseChannel.receive()) + when (msg) { + is SubscriptionMessageNext -> msg.id + is SubscriptionMessageComplete -> msg.id + else -> error("unexpected message type: $msg") + } + } + assertTrue(firstResponseIds.all { it == firstId }, "expected all first-batch ids == $firstId but got $firstResponseIds") + + val secondResponseIds = (1..4).map { + val msg: GraphQLSubscriptionMessage = mapper.readValue(responseChannel.receive()) + when (msg) { + is SubscriptionMessageNext -> msg.id + is SubscriptionMessageComplete -> msg.id + else -> error("unexpected message type: $msg") + } + } + assertTrue(secondResponseIds.all { it == secondId }, "expected all second-batch ids == $secondId but got $secondResponseIds") + + subscriptionJob.cancelAndJoin() + } + private fun testGraphQLEngine(): GraphQL = GraphQL.newGraphQL( toSchema( config = SchemaGeneratorConfig( diff --git a/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/InMemoryGraphQLSubscriptionServer.kt b/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/InMemoryGraphQLSubscriptionServer.kt index b009b4a652..426022c729 100644 --- a/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/InMemoryGraphQLSubscriptionServer.kt +++ b/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/InMemoryGraphQLSubscriptionServer.kt @@ -30,9 +30,15 @@ class InMemoryGraphQLSubscriptionServer( requestParser: InMemorySubscriptionRequestParser = InMemorySubscriptionRequestParser(), contextFactory: InMemorySubscriptionContextFactory = InMemorySubscriptionContextFactory(), hooks: InMemorySubscriptionHooks = InMemorySubscriptionHooks(), - timeoutInMillis: Long = 1000 + timeoutInMillis: Long = 1000, + subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY ) : GraphQLWebSocketServer, String>( - requestParser, contextFactory, hooks, requestHandler, timeoutInMillis + requestParser = requestParser, + contextFactory = contextFactory, + subscriptionHooks = hooks, + requestHandler = requestHandler, + initTimeoutMillis = timeoutInMillis, + subscriptionConcurrency = subscriptionConcurrency ) { val outboundChannel = Channel(Channel.BUFFERED) diff --git a/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/GraphQLConfigurationProperties.kt b/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/GraphQLConfigurationProperties.kt index 91412583ff..32fe11d660 100644 --- a/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/GraphQLConfigurationProperties.kt +++ b/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/GraphQLConfigurationProperties.kt @@ -16,6 +16,7 @@ package com.expediagroup.graphql.server.spring +import com.expediagroup.graphql.server.execution.subscription.DEFAULT_WS_SUBSCRIPTION_CONCURRENCY import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.boot.context.properties.NestedConfigurationProperty @@ -91,7 +92,14 @@ data class GraphQLConfigurationProperties( /** Server timeout between establishing web socket connection and receiving connection-init message. */ val connectionInitTimeout: Long = 60_000, /** WebSocket based subscription protocol */ - val protocol: SubscriptionProtocol = SubscriptionProtocol.GRAPHQL_WS + val protocol: SubscriptionProtocol = SubscriptionProtocol.GRAPHQL_WS, + /** + * Maximum number of inbound client messages processed concurrently per web socket session. Defaults to + * [DEFAULT_WS_SUBSCRIPTION_CONCURRENCY] (16). Raise this when a single session may hold more than the + * default number of simultaneous subscriptions, otherwise additional messages (including ping/complete/ + * subscribe) are back-pressured until one of the in-flight messages completes. + */ + val subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY ) /** diff --git a/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/SubscriptionGraphQLWsAutoConfiguration.kt b/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/SubscriptionGraphQLWsAutoConfiguration.kt index 6b0f9dbea3..7bd986a613 100644 --- a/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/SubscriptionGraphQLWsAutoConfiguration.kt +++ b/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/SubscriptionGraphQLWsAutoConfiguration.kt @@ -63,6 +63,7 @@ class SubscriptionGraphQLWsAutoConfiguration { subscriptionHooks, handler, config.subscriptions.connectionInitTimeout, - objectMapper + objectMapper, + config.subscriptions.subscriptionConcurrency ) } diff --git a/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandler.kt b/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandler.kt index 8e8c3cf687..33b13ce483 100644 --- a/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandler.kt +++ b/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandler.kt @@ -17,6 +17,7 @@ package com.expediagroup.graphql.server.spring.subscriptions import com.expediagroup.graphql.server.execution.GraphQLRequestHandler +import com.expediagroup.graphql.server.execution.subscription.DEFAULT_WS_SUBSCRIPTION_CONCURRENCY import com.expediagroup.graphql.server.execution.subscription.GRAPHQL_WS_PROTOCOL import com.expediagroup.graphql.server.execution.subscription.GraphQLWebSocketServer import com.expediagroup.graphql.server.types.GraphQLSubscriptionStatus @@ -40,9 +41,10 @@ class SubscriptionWebSocketHandler( subscriptionHooks: SpringGraphQLSubscriptionHooks, graphqlHandler: GraphQLRequestHandler, initTimeoutMillis: Long, - objectMapper: ObjectMapper + objectMapper: ObjectMapper, + subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY ) : WebSocketHandler, GraphQLWebSocketServer( - requestParser, contextFactory, subscriptionHooks, graphqlHandler, initTimeoutMillis, objectMapper + requestParser, contextFactory, subscriptionHooks, graphqlHandler, initTimeoutMillis, objectMapper, subscriptionConcurrency ) { override fun handle(session: WebSocketSession): Mono = session.send( flux {