From 9f4f0af96eec7410459eb1fbc35328cf056f6f77 Mon Sep 17 00:00:00 2001 From: gaurav0107 Date: Fri, 15 May 2026 01:57:03 +0530 Subject: [PATCH] feat(server-subscriptions): expose flatMapMerge concurrency for websocket subscriptions Previously GraphQLWebSocketServer.handleSubscription piped the inbound client message flow through flatMapMerge with no explicit concurrency, so it defaulted to the kotlinx DEFAULT_CONCURRENCY of 16. A single websocket session holding more than 16 in-flight subscriptions silently back-pressured every subsequent message on the channel - including ping, complete, and new subscribe - until one of the 16 completed, causing 17th-and-later subscribes to look hung. Expose the concurrency as a configurable value threaded through the three construction paths (raw GraphQLWebSocketServer, Ktor subclass, Spring subclass) and both server configurations (Ktor KtorSubscriptionConfiguration, Spring SubscriptionConfigurationProperties). Default remains 16 so existing callers see no behaviour change; raising it (or Int.MAX_VALUE) avoids the hang described in the issue. A new regression test constructs the in-memory subscription server with subscriptionConcurrency=1 and verifies that two back-to-back subscribe messages are serialised (all first-id responses arrive before any second-id response), which would fail under the previous implicit default. --- .../graphql/server/ktor/GraphQL.kt | 3 +- .../server/ktor/GraphQLConfiguration.kt | 9 ++++ .../KtorGraphQLWebSocketServer.kt | 6 ++- .../subscription/GraphQLWebSocketServer.kt | 21 ++++++-- .../GraphQLWebSocketServerTest.kt | 54 +++++++++++++++++++ .../InMemoryGraphQLSubscriptionServer.kt | 10 +++- .../spring/GraphQLConfigurationProperties.kt | 10 +++- .../SubscriptionGraphQLWsAutoConfiguration.kt | 3 +- .../SubscriptionWebSocketHandler.kt | 6 ++- 9 files changed, 110 insertions(+), 12 deletions(-) diff --git a/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQL.kt b/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQL.kt index aebbf275f7..aeba1fc79d 100644 --- a/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQL.kt +++ b/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQL.kt @@ -169,7 +169,8 @@ class GraphQL(config: GraphQLConfiguration) { subscriptionHooks = config.server.subscriptions.hooks, requestHandler = requestHandler, initTimeoutMillis = config.server.subscriptions.connectionInitTimeout, - objectMapper = jacksonMapperBuilder().apply(config.server.jacksonConfiguration).build() + objectMapper = jacksonMapperBuilder().apply(config.server.jacksonConfiguration).build(), + subscriptionConcurrency = config.server.subscriptions.subscriptionConcurrency ) } diff --git a/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQLConfiguration.kt b/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQLConfiguration.kt index c0a4d0fb57..56e44e46db 100644 --- a/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQLConfiguration.kt +++ b/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQLConfiguration.kt @@ -26,6 +26,7 @@ import com.expediagroup.graphql.generator.hooks.FlowSubscriptionSchemaGeneratorH import com.expediagroup.graphql.generator.hooks.SchemaGeneratorHooks import com.expediagroup.graphql.generator.scalars.IDValueUnboxer import com.expediagroup.graphql.server.Schema +import com.expediagroup.graphql.server.execution.subscription.DEFAULT_WS_SUBSCRIPTION_CONCURRENCY import com.expediagroup.graphql.server.ktor.subscriptions.DefaultKtorGraphQLSubscriptionContextFactory import com.expediagroup.graphql.server.ktor.subscriptions.DefaultKtorGraphQLSubscriptionHooks import com.expediagroup.graphql.server.ktor.subscriptions.DefaultKtorGraphQLSubscriptionRequestParser @@ -284,6 +285,14 @@ class GraphQLConfiguration(config: ApplicationConfig) { var hooks: KtorGraphQLSubscriptionHooks = DefaultKtorGraphQLSubscriptionHooks() /** Server timeout between establishing web socket connection and receiving connection-init message */ var connectionInitTimeout: Long = config.tryGetString("graphql.server.subscription.connectionInitTimeout")?.toLongOrNull() ?: 60_000 + /** + * Maximum number of inbound client messages processed concurrently per web socket session. Defaults to + * [DEFAULT_WS_SUBSCRIPTION_CONCURRENCY] (16). Raise this when a single session may hold more than the default + * number of simultaneous subscriptions, otherwise additional messages (including ping/complete/subscribe) + * are back-pressured until one of the in-flight messages completes. + */ + var subscriptionConcurrency: Int = + config.tryGetString("graphql.server.subscription.concurrency")?.toIntOrNull() ?: DEFAULT_WS_SUBSCRIPTION_CONCURRENCY } } diff --git a/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/subscriptions/KtorGraphQLWebSocketServer.kt b/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/subscriptions/KtorGraphQLWebSocketServer.kt index dd759b73c1..2b8e8ed0ee 100644 --- a/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/subscriptions/KtorGraphQLWebSocketServer.kt +++ b/servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/subscriptions/KtorGraphQLWebSocketServer.kt @@ -17,6 +17,7 @@ package com.expediagroup.graphql.server.ktor.subscriptions import com.expediagroup.graphql.server.execution.GraphQLRequestHandler +import com.expediagroup.graphql.server.execution.subscription.DEFAULT_WS_SUBSCRIPTION_CONCURRENCY import com.expediagroup.graphql.server.execution.subscription.GraphQLWebSocketServer import com.expediagroup.graphql.server.types.GraphQLSubscriptionStatus import io.ktor.server.websocket.WebSocketServerSession @@ -36,9 +37,10 @@ class KtorGraphQLWebSocketServer( subscriptionHooks: KtorGraphQLSubscriptionHooks, requestHandler: GraphQLRequestHandler, initTimeoutMillis: Long, - objectMapper: ObjectMapper + objectMapper: ObjectMapper, + subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY ) : GraphQLWebSocketServer( - requestParser, contextFactory, subscriptionHooks, requestHandler, initTimeoutMillis, objectMapper + requestParser, contextFactory, subscriptionHooks, requestHandler, initTimeoutMillis, objectMapper, subscriptionConcurrency ) { override suspend fun closeSession(session: WebSocketServerSession, reason: GraphQLSubscriptionStatus) { session.close(CloseReason(reason.code.toShort(), reason.reason)) diff --git a/servers/graphql-kotlin-server/src/main/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServer.kt b/servers/graphql-kotlin-server/src/main/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServer.kt index c49b064dcd..47e1a43345 100644 --- a/servers/graphql-kotlin-server/src/main/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServer.kt +++ b/servers/graphql-kotlin-server/src/main/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServer.kt @@ -57,7 +57,21 @@ import kotlin.coroutines.EmptyCoroutineContext const val GRAPHQL_WS_PROTOCOL = "graphql-transport-ws" /** - * GraphQL Web Socket server implementation for handling subscriptions using *graphql-transport-ws* protocol + * Default maximum number of in-flight inbound messages processed concurrently by a single web socket session. + * + * Matches the historical default of `kotlinx.coroutines.flow.flatMapMerge` (16) that was used before this value + * was made configurable. Raising the limit lets a single session process more simultaneous subscriptions at the + * cost of higher peak memory; `Int.MAX_VALUE` effectively removes the limit; `1` serializes message processing. + */ +const val DEFAULT_WS_SUBSCRIPTION_CONCURRENCY: Int = 16 + +/** + * GraphQL Web Socket server implementation for handling subscriptions using *graphql-transport-ws* protocol. + * + * @param subscriptionConcurrency maximum number of inbound client messages processed concurrently by the + * `flatMapMerge` operator that drives [handleSubscription]. At the default ([DEFAULT_WS_SUBSCRIPTION_CONCURRENCY]) + * in-flight subscriptions can back-pressure sibling protocol messages (ping, complete, new subscribe) once the + * ceiling is hit; see issue #2018. Pass a larger value, or `Int.MAX_VALUE`, to avoid this. * * @see graphql-transport-ws protocol */ @@ -67,7 +81,8 @@ abstract class GraphQLWebSocketServer( private val subscriptionHooks: GraphQLSubscriptionHooks, private val requestHandler: GraphQLRequestHandler, private val initTimeoutMillis: Long, - private val objectMapper: ObjectMapper = jacksonObjectMapper() + private val objectMapper: ObjectMapper = jacksonObjectMapper(), + private val subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY ) { private val logger: Logger = LoggerFactory.getLogger(GraphQLWebSocketServer::class.java) private val subscriptionScope = CoroutineScope(SupervisorJob()) @@ -86,7 +101,7 @@ abstract class GraphQLWebSocketServer( requestParser.parseRequestFlow(session) .map { objectMapper.readValue(it) } - .flatMapMerge { message -> + .flatMapMerge(concurrency = subscriptionConcurrency) { message -> channelFlow { when (message) { is SubscriptionMessageConnectionInit -> { diff --git a/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServerTest.kt b/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServerTest.kt index c71c5ee445..1128a93e61 100644 --- a/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServerTest.kt +++ b/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServerTest.kt @@ -300,6 +300,60 @@ class GraphQLWebSocketServerTest { subscriptionJob.cancelAndJoin() } + @Test + fun `verify subscription flow honors configured concurrency`() = runTest { + // concurrency=1 serializes inbound message processing: each subscribe's channelFlow must fully complete + // (counter 1,2,3 + complete) before the next subscribe starts flowing. With the historical default (16) + // the two subscriptions would interleave. See issue #2018. + val handler = GraphQLRequestHandler(graphQL = testGraphQLEngine()) + val testServer = InMemoryGraphQLSubscriptionServer( + requestHandler = handler, + subscriptionConcurrency = 1 + ) + + val session = Channel(Channel.BUFFERED) + val responseChannel = testServer.outboundChannel + + val subscriptionJob = launch { + testServer.handleSubscription(session) + .collect() + } + + session.send(mapper.writeValueAsString(SubscriptionMessageConnectionInit())) + val ack: GraphQLSubscriptionMessage = mapper.readValue(responseChannel.receive()) + assertEquals(GRAPHQL_WS_CONNECTION_ACK, ack.type) + + val firstId = UUID.randomUUID().toString() + val secondId = UUID.randomUUID().toString() + val request = GraphQLRequest(query = "subscription { counter }") + session.send(mapper.writeValueAsString(SubscriptionMessageSubscribe(id = firstId, payload = request))) + session.send(mapper.writeValueAsString(SubscriptionMessageSubscribe(id = secondId, payload = request))) + + // With concurrency=1 the second subscribe is held behind the first, so every response for firstId + // (3 next + 1 complete) must arrive before any response for secondId. + val firstResponseIds = (1..4).map { + val msg: GraphQLSubscriptionMessage = mapper.readValue(responseChannel.receive()) + when (msg) { + is SubscriptionMessageNext -> msg.id + is SubscriptionMessageComplete -> msg.id + else -> error("unexpected message type: $msg") + } + } + assertTrue(firstResponseIds.all { it == firstId }, "expected all first-batch ids == $firstId but got $firstResponseIds") + + val secondResponseIds = (1..4).map { + val msg: GraphQLSubscriptionMessage = mapper.readValue(responseChannel.receive()) + when (msg) { + is SubscriptionMessageNext -> msg.id + is SubscriptionMessageComplete -> msg.id + else -> error("unexpected message type: $msg") + } + } + assertTrue(secondResponseIds.all { it == secondId }, "expected all second-batch ids == $secondId but got $secondResponseIds") + + subscriptionJob.cancelAndJoin() + } + private fun testGraphQLEngine(): GraphQL = GraphQL.newGraphQL( toSchema( config = SchemaGeneratorConfig( diff --git a/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/InMemoryGraphQLSubscriptionServer.kt b/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/InMemoryGraphQLSubscriptionServer.kt index b009b4a652..426022c729 100644 --- a/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/InMemoryGraphQLSubscriptionServer.kt +++ b/servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/InMemoryGraphQLSubscriptionServer.kt @@ -30,9 +30,15 @@ class InMemoryGraphQLSubscriptionServer( requestParser: InMemorySubscriptionRequestParser = InMemorySubscriptionRequestParser(), contextFactory: InMemorySubscriptionContextFactory = InMemorySubscriptionContextFactory(), hooks: InMemorySubscriptionHooks = InMemorySubscriptionHooks(), - timeoutInMillis: Long = 1000 + timeoutInMillis: Long = 1000, + subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY ) : GraphQLWebSocketServer, String>( - requestParser, contextFactory, hooks, requestHandler, timeoutInMillis + requestParser = requestParser, + contextFactory = contextFactory, + subscriptionHooks = hooks, + requestHandler = requestHandler, + initTimeoutMillis = timeoutInMillis, + subscriptionConcurrency = subscriptionConcurrency ) { val outboundChannel = Channel(Channel.BUFFERED) diff --git a/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/GraphQLConfigurationProperties.kt b/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/GraphQLConfigurationProperties.kt index 91412583ff..32fe11d660 100644 --- a/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/GraphQLConfigurationProperties.kt +++ b/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/GraphQLConfigurationProperties.kt @@ -16,6 +16,7 @@ package com.expediagroup.graphql.server.spring +import com.expediagroup.graphql.server.execution.subscription.DEFAULT_WS_SUBSCRIPTION_CONCURRENCY import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.boot.context.properties.NestedConfigurationProperty @@ -91,7 +92,14 @@ data class GraphQLConfigurationProperties( /** Server timeout between establishing web socket connection and receiving connection-init message. */ val connectionInitTimeout: Long = 60_000, /** WebSocket based subscription protocol */ - val protocol: SubscriptionProtocol = SubscriptionProtocol.GRAPHQL_WS + val protocol: SubscriptionProtocol = SubscriptionProtocol.GRAPHQL_WS, + /** + * Maximum number of inbound client messages processed concurrently per web socket session. Defaults to + * [DEFAULT_WS_SUBSCRIPTION_CONCURRENCY] (16). Raise this when a single session may hold more than the + * default number of simultaneous subscriptions, otherwise additional messages (including ping/complete/ + * subscribe) are back-pressured until one of the in-flight messages completes. + */ + val subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY ) /** diff --git a/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/SubscriptionGraphQLWsAutoConfiguration.kt b/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/SubscriptionGraphQLWsAutoConfiguration.kt index 6b0f9dbea3..7bd986a613 100644 --- a/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/SubscriptionGraphQLWsAutoConfiguration.kt +++ b/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/SubscriptionGraphQLWsAutoConfiguration.kt @@ -63,6 +63,7 @@ class SubscriptionGraphQLWsAutoConfiguration { subscriptionHooks, handler, config.subscriptions.connectionInitTimeout, - objectMapper + objectMapper, + config.subscriptions.subscriptionConcurrency ) } diff --git a/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandler.kt b/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandler.kt index 8e8c3cf687..33b13ce483 100644 --- a/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandler.kt +++ b/servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandler.kt @@ -17,6 +17,7 @@ package com.expediagroup.graphql.server.spring.subscriptions import com.expediagroup.graphql.server.execution.GraphQLRequestHandler +import com.expediagroup.graphql.server.execution.subscription.DEFAULT_WS_SUBSCRIPTION_CONCURRENCY import com.expediagroup.graphql.server.execution.subscription.GRAPHQL_WS_PROTOCOL import com.expediagroup.graphql.server.execution.subscription.GraphQLWebSocketServer import com.expediagroup.graphql.server.types.GraphQLSubscriptionStatus @@ -40,9 +41,10 @@ class SubscriptionWebSocketHandler( subscriptionHooks: SpringGraphQLSubscriptionHooks, graphqlHandler: GraphQLRequestHandler, initTimeoutMillis: Long, - objectMapper: ObjectMapper + objectMapper: ObjectMapper, + subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY ) : WebSocketHandler, GraphQLWebSocketServer( - requestParser, contextFactory, subscriptionHooks, graphqlHandler, initTimeoutMillis, objectMapper + requestParser, contextFactory, subscriptionHooks, graphqlHandler, initTimeoutMillis, objectMapper, subscriptionConcurrency ) { override fun handle(session: WebSocketSession): Mono = session.send( flux {