Skip to content

Commit 2f2fa3f

Browse files
authored
feat(server-subscriptions): expose flatMapMerge concurrency for websocket subscriptions (#2175)
### 📝 Description `GraphQLWebSocketServer.handleSubscription` pipes the inbound client-message flow through `flatMapMerge { ... }` without an explicit `concurrency` argument, so it falls back to the kotlinx `DEFAULT_CONCURRENCY = 16`. A single websocket session holding more than 16 in-flight subscriptions silently back-pressures every subsequent inbound message on the underlying flow — including `ping`, `complete`, and additional `subscribe` messages — until one of the 16 in-flight messages completes. The 17th-and-later subscribes therefore look hung from the client's perspective even though the transport is healthy. This change exposes the concurrency as a configurable value, threaded through the three construction paths and both server configurations: - `GraphQLWebSocketServer` — adds a final constructor parameter `subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY` (top-level `const val = 16`), used as the `concurrency` argument to `flatMapMerge`. - `KtorGraphQLWebSocketServer` — forwards a matching parameter to the superclass constructor. - `KtorSubscriptionConfiguration` — reads `graphql.server.subscription.concurrency` from `ApplicationConfig` with the same default; `GraphQL.kt` wires it into the Ktor handler. - `SubscriptionWebSocketHandler` (Spring) — forwards a matching parameter to the superclass constructor. - `SubscriptionConfigurationProperties` — adds `subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY` as a trailing, defaulted field (preserves data-class binary compatibility); `SubscriptionGraphQLWsAutoConfiguration` wires it through. Defaults are unchanged (16), so existing callers see identical behaviour. Users who hold many simultaneous subscriptions per session can now raise the value (e.g. `Int.MAX_VALUE`) to avoid the back-pressure hang described in the issue. A new regression test (`verify subscription flow honors configured concurrency`) constructs the in-memory subscription server with `subscriptionConcurrency = 1` and sends two back-to-back `subscribe` messages. Under the previous implicit default the two subscriptions would interleave; with `concurrency = 1` the assertion is that all four responses for the first subscription id (3 × `next` + `complete`) arrive before any response for the second, which is observable and would have been impossible without exposing the knob. The scope is intentionally narrow: only the plumbing and default are changed. No change to `TOO_MANY_REQUESTS` handling, no change to graphql-ws protocol semantics, no new public types beyond the constant and the trailing parameters. ### 🔗 Related Issues Closes #2018
1 parent f9e0ea4 commit 2f2fa3f

9 files changed

Lines changed: 110 additions & 12 deletions

File tree

servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQL.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ class GraphQL(config: GraphQLConfiguration) {
169169
subscriptionHooks = config.server.subscriptions.hooks,
170170
requestHandler = requestHandler,
171171
initTimeoutMillis = config.server.subscriptions.connectionInitTimeout,
172-
objectMapper = jacksonMapperBuilder().apply(config.server.jacksonConfiguration).build()
172+
objectMapper = jacksonMapperBuilder().apply(config.server.jacksonConfiguration).build(),
173+
subscriptionConcurrency = config.server.subscriptions.subscriptionConcurrency
173174
)
174175
}
175176

servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/GraphQLConfiguration.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import com.expediagroup.graphql.generator.hooks.FlowSubscriptionSchemaGeneratorH
2626
import com.expediagroup.graphql.generator.hooks.SchemaGeneratorHooks
2727
import com.expediagroup.graphql.generator.scalars.IDValueUnboxer
2828
import com.expediagroup.graphql.server.Schema
29+
import com.expediagroup.graphql.server.execution.subscription.DEFAULT_WS_SUBSCRIPTION_CONCURRENCY
2930
import com.expediagroup.graphql.server.ktor.subscriptions.DefaultKtorGraphQLSubscriptionContextFactory
3031
import com.expediagroup.graphql.server.ktor.subscriptions.DefaultKtorGraphQLSubscriptionHooks
3132
import com.expediagroup.graphql.server.ktor.subscriptions.DefaultKtorGraphQLSubscriptionRequestParser
@@ -284,6 +285,14 @@ class GraphQLConfiguration(config: ApplicationConfig) {
284285
var hooks: KtorGraphQLSubscriptionHooks = DefaultKtorGraphQLSubscriptionHooks()
285286
/** Server timeout between establishing web socket connection and receiving connection-init message */
286287
var connectionInitTimeout: Long = config.tryGetString("graphql.server.subscription.connectionInitTimeout")?.toLongOrNull() ?: 60_000
288+
/**
289+
* Maximum number of inbound client messages processed concurrently per web socket session. Defaults to
290+
* [DEFAULT_WS_SUBSCRIPTION_CONCURRENCY] (16). Raise this when a single session may hold more than the default
291+
* number of simultaneous subscriptions, otherwise additional messages (including ping/complete/subscribe)
292+
* are back-pressured until one of the in-flight messages completes.
293+
*/
294+
var subscriptionConcurrency: Int =
295+
config.tryGetString("graphql.server.subscription.concurrency")?.toIntOrNull() ?: DEFAULT_WS_SUBSCRIPTION_CONCURRENCY
287296
}
288297
}
289298

servers/graphql-kotlin-ktor-server/src/main/kotlin/com/expediagroup/graphql/server/ktor/subscriptions/KtorGraphQLWebSocketServer.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.expediagroup.graphql.server.ktor.subscriptions
1818

1919
import com.expediagroup.graphql.server.execution.GraphQLRequestHandler
20+
import com.expediagroup.graphql.server.execution.subscription.DEFAULT_WS_SUBSCRIPTION_CONCURRENCY
2021
import com.expediagroup.graphql.server.execution.subscription.GraphQLWebSocketServer
2122
import com.expediagroup.graphql.server.types.GraphQLSubscriptionStatus
2223
import io.ktor.server.websocket.WebSocketServerSession
@@ -36,9 +37,10 @@ class KtorGraphQLWebSocketServer(
3637
subscriptionHooks: KtorGraphQLSubscriptionHooks,
3738
requestHandler: GraphQLRequestHandler,
3839
initTimeoutMillis: Long,
39-
objectMapper: ObjectMapper
40+
objectMapper: ObjectMapper,
41+
subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY
4042
) : GraphQLWebSocketServer<WebSocketServerSession, Unit>(
41-
requestParser, contextFactory, subscriptionHooks, requestHandler, initTimeoutMillis, objectMapper
43+
requestParser, contextFactory, subscriptionHooks, requestHandler, initTimeoutMillis, objectMapper, subscriptionConcurrency
4244
) {
4345
override suspend fun closeSession(session: WebSocketServerSession, reason: GraphQLSubscriptionStatus) {
4446
session.close(CloseReason(reason.code.toShort(), reason.reason))

servers/graphql-kotlin-server/src/main/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServer.kt

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,21 @@ import kotlin.coroutines.EmptyCoroutineContext
5757
const val GRAPHQL_WS_PROTOCOL = "graphql-transport-ws"
5858

5959
/**
60-
* GraphQL Web Socket server implementation for handling subscriptions using *graphql-transport-ws* protocol
60+
* Default maximum number of in-flight inbound messages processed concurrently by a single web socket session.
61+
*
62+
* Matches the historical default of `kotlinx.coroutines.flow.flatMapMerge` (16) that was used before this value
63+
* was made configurable. Raising the limit lets a single session process more simultaneous subscriptions at the
64+
* cost of higher peak memory; `Int.MAX_VALUE` effectively removes the limit; `1` serializes message processing.
65+
*/
66+
const val DEFAULT_WS_SUBSCRIPTION_CONCURRENCY: Int = 16
67+
68+
/**
69+
* GraphQL Web Socket server implementation for handling subscriptions using *graphql-transport-ws* protocol.
70+
*
71+
* @param subscriptionConcurrency maximum number of inbound client messages processed concurrently by the
72+
* `flatMapMerge` operator that drives [handleSubscription]. At the default ([DEFAULT_WS_SUBSCRIPTION_CONCURRENCY])
73+
* in-flight subscriptions can back-pressure sibling protocol messages (ping, complete, new subscribe) once the
74+
* ceiling is hit; see issue #2018. Pass a larger value, or `Int.MAX_VALUE`, to avoid this.
6175
*
6276
* @see <a href="https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md">graphql-transport-ws protocol</a>
6377
*/
@@ -67,7 +81,8 @@ abstract class GraphQLWebSocketServer<Session, Message>(
6781
private val subscriptionHooks: GraphQLSubscriptionHooks<Session>,
6882
private val requestHandler: GraphQLRequestHandler,
6983
private val initTimeoutMillis: Long,
70-
private val objectMapper: ObjectMapper = jacksonObjectMapper()
84+
private val objectMapper: ObjectMapper = jacksonObjectMapper(),
85+
private val subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY
7186
) {
7287
private val logger: Logger = LoggerFactory.getLogger(GraphQLWebSocketServer::class.java)
7388
private val subscriptionScope = CoroutineScope(SupervisorJob())
@@ -86,7 +101,7 @@ abstract class GraphQLWebSocketServer<Session, Message>(
86101

87102
requestParser.parseRequestFlow(session)
88103
.map { objectMapper.readValue<GraphQLSubscriptionMessage>(it) }
89-
.flatMapMerge { message ->
104+
.flatMapMerge(concurrency = subscriptionConcurrency) { message ->
90105
channelFlow {
91106
when (message) {
92107
is SubscriptionMessageConnectionInit -> {

servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/GraphQLWebSocketServerTest.kt

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,60 @@ class GraphQLWebSocketServerTest {
300300
subscriptionJob.cancelAndJoin()
301301
}
302302

303+
@Test
304+
fun `verify subscription flow honors configured concurrency`() = runTest {
305+
// concurrency=1 serializes inbound message processing: each subscribe's channelFlow must fully complete
306+
// (counter 1,2,3 + complete) before the next subscribe starts flowing. With the historical default (16)
307+
// the two subscriptions would interleave. See issue #2018.
308+
val handler = GraphQLRequestHandler(graphQL = testGraphQLEngine())
309+
val testServer = InMemoryGraphQLSubscriptionServer(
310+
requestHandler = handler,
311+
subscriptionConcurrency = 1
312+
)
313+
314+
val session = Channel<String>(Channel.BUFFERED)
315+
val responseChannel = testServer.outboundChannel
316+
317+
val subscriptionJob = launch {
318+
testServer.handleSubscription(session)
319+
.collect()
320+
}
321+
322+
session.send(mapper.writeValueAsString(SubscriptionMessageConnectionInit()))
323+
val ack: GraphQLSubscriptionMessage = mapper.readValue(responseChannel.receive())
324+
assertEquals(GRAPHQL_WS_CONNECTION_ACK, ack.type)
325+
326+
val firstId = UUID.randomUUID().toString()
327+
val secondId = UUID.randomUUID().toString()
328+
val request = GraphQLRequest(query = "subscription { counter }")
329+
session.send(mapper.writeValueAsString(SubscriptionMessageSubscribe(id = firstId, payload = request)))
330+
session.send(mapper.writeValueAsString(SubscriptionMessageSubscribe(id = secondId, payload = request)))
331+
332+
// With concurrency=1 the second subscribe is held behind the first, so every response for firstId
333+
// (3 next + 1 complete) must arrive before any response for secondId.
334+
val firstResponseIds = (1..4).map {
335+
val msg: GraphQLSubscriptionMessage = mapper.readValue(responseChannel.receive())
336+
when (msg) {
337+
is SubscriptionMessageNext -> msg.id
338+
is SubscriptionMessageComplete -> msg.id
339+
else -> error("unexpected message type: $msg")
340+
}
341+
}
342+
assertTrue(firstResponseIds.all { it == firstId }, "expected all first-batch ids == $firstId but got $firstResponseIds")
343+
344+
val secondResponseIds = (1..4).map {
345+
val msg: GraphQLSubscriptionMessage = mapper.readValue(responseChannel.receive())
346+
when (msg) {
347+
is SubscriptionMessageNext -> msg.id
348+
is SubscriptionMessageComplete -> msg.id
349+
else -> error("unexpected message type: $msg")
350+
}
351+
}
352+
assertTrue(secondResponseIds.all { it == secondId }, "expected all second-batch ids == $secondId but got $secondResponseIds")
353+
354+
subscriptionJob.cancelAndJoin()
355+
}
356+
303357
private fun testGraphQLEngine(): GraphQL = GraphQL.newGraphQL(
304358
toSchema(
305359
config = SchemaGeneratorConfig(

servers/graphql-kotlin-server/src/test/kotlin/com/expediagroup/graphql/server/execution/subscription/InMemoryGraphQLSubscriptionServer.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,15 @@ class InMemoryGraphQLSubscriptionServer(
3030
requestParser: InMemorySubscriptionRequestParser = InMemorySubscriptionRequestParser(),
3131
contextFactory: InMemorySubscriptionContextFactory = InMemorySubscriptionContextFactory(),
3232
hooks: InMemorySubscriptionHooks = InMemorySubscriptionHooks(),
33-
timeoutInMillis: Long = 1000
33+
timeoutInMillis: Long = 1000,
34+
subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY
3435
) : GraphQLWebSocketServer<Channel<String>, String>(
35-
requestParser, contextFactory, hooks, requestHandler, timeoutInMillis
36+
requestParser = requestParser,
37+
contextFactory = contextFactory,
38+
subscriptionHooks = hooks,
39+
requestHandler = requestHandler,
40+
initTimeoutMillis = timeoutInMillis,
41+
subscriptionConcurrency = subscriptionConcurrency
3642
) {
3743
val outboundChannel = Channel<String>(Channel.BUFFERED)
3844

servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/GraphQLConfigurationProperties.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.expediagroup.graphql.server.spring
1818

19+
import com.expediagroup.graphql.server.execution.subscription.DEFAULT_WS_SUBSCRIPTION_CONCURRENCY
1920
import org.springframework.boot.context.properties.ConfigurationProperties
2021
import org.springframework.boot.context.properties.NestedConfigurationProperty
2122

@@ -91,7 +92,14 @@ data class GraphQLConfigurationProperties(
9192
/** Server timeout between establishing web socket connection and receiving connection-init message. */
9293
val connectionInitTimeout: Long = 60_000,
9394
/** WebSocket based subscription protocol */
94-
val protocol: SubscriptionProtocol = SubscriptionProtocol.GRAPHQL_WS
95+
val protocol: SubscriptionProtocol = SubscriptionProtocol.GRAPHQL_WS,
96+
/**
97+
* Maximum number of inbound client messages processed concurrently per web socket session. Defaults to
98+
* [DEFAULT_WS_SUBSCRIPTION_CONCURRENCY] (16). Raise this when a single session may hold more than the
99+
* default number of simultaneous subscriptions, otherwise additional messages (including ping/complete/
100+
* subscribe) are back-pressured until one of the in-flight messages completes.
101+
*/
102+
val subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY
95103
)
96104

97105
/**

servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/SubscriptionGraphQLWsAutoConfiguration.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class SubscriptionGraphQLWsAutoConfiguration {
6363
subscriptionHooks,
6464
handler,
6565
config.subscriptions.connectionInitTimeout,
66-
objectMapper
66+
objectMapper,
67+
config.subscriptions.subscriptionConcurrency
6768
)
6869
}

servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandler.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.expediagroup.graphql.server.spring.subscriptions
1818

1919
import com.expediagroup.graphql.server.execution.GraphQLRequestHandler
20+
import com.expediagroup.graphql.server.execution.subscription.DEFAULT_WS_SUBSCRIPTION_CONCURRENCY
2021
import com.expediagroup.graphql.server.execution.subscription.GRAPHQL_WS_PROTOCOL
2122
import com.expediagroup.graphql.server.execution.subscription.GraphQLWebSocketServer
2223
import com.expediagroup.graphql.server.types.GraphQLSubscriptionStatus
@@ -40,9 +41,10 @@ class SubscriptionWebSocketHandler(
4041
subscriptionHooks: SpringGraphQLSubscriptionHooks,
4142
graphqlHandler: GraphQLRequestHandler,
4243
initTimeoutMillis: Long,
43-
objectMapper: ObjectMapper
44+
objectMapper: ObjectMapper,
45+
subscriptionConcurrency: Int = DEFAULT_WS_SUBSCRIPTION_CONCURRENCY
4446
) : WebSocketHandler, GraphQLWebSocketServer<WebSocketSession, WebSocketMessage>(
45-
requestParser, contextFactory, subscriptionHooks, graphqlHandler, initTimeoutMillis, objectMapper
47+
requestParser, contextFactory, subscriptionHooks, graphqlHandler, initTimeoutMillis, objectMapper, subscriptionConcurrency
4648
) {
4749
override fun handle(session: WebSocketSession): Mono<Void> = session.send(
4850
flux {

0 commit comments

Comments
 (0)