Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import kotlinx.serialization.json.JsonObject
public const val LATEST_PROTOCOL_VERSION: String = "2025-11-25"

/** The default protocol version used when negotiation is not performed. */
public const val DEFAULT_NEGOTIATED_PROTOCOL_VERSION: String = "2025-06-18"
public const val DEFAULT_NEGOTIATED_PROTOCOL_VERSION: String = "2025-03-26"

/** All MCP protocol versions supported by this SDK. */
public val SUPPORTED_PROTOCOL_VERSIONS: List<String> = listOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ public class StreamableHttpServerTransport(private val configuration: Configurat
)
return
}
if (!validateProtocolVersion(call)) return
if (messages.size > 1) {
call.reject(
HttpStatusCode.BadRequest,
Expand All @@ -393,11 +394,23 @@ public class StreamableHttpServerTransport(private val configuration: Configurat
return
}

// Extract protocol version for priming event decision.
// For initialize requests, get from request params.
// For other requests, get from header (already validated).
val clientProtocolVersion = if (isInitializationRequest) {
val initRequest = messages.first() as JSONRPCRequest
(initRequest.params as? JsonObject)?.get("protocolVersion")
?.let { McpJson.decodeFromJsonElement<String>(it) }
?: DEFAULT_NEGOTIATED_PROTOCOL_VERSION
Comment thread
devcrocod marked this conversation as resolved.
} else {
call.request.header(MCP_PROTOCOL_VERSION_HEADER) ?: DEFAULT_NEGOTIATED_PROTOCOL_VERSION
}

val streamId = Uuid.random().toString()
if (!configuration.enableJsonResponse) {
call.appendSseHeaders()
flushSse(session) // flush headers immediately
maybeSendPrimingEvent(streamId, session, call.request.header(MCP_PROTOCOL_VERSION_HEADER))
maybeSendPrimingEvent(streamId, session, clientProtocolVersion)
}

streamMutex.withLock {
Expand Down Expand Up @@ -456,7 +469,9 @@ public class StreamableHttpServerTransport(private val configuration: Configurat
// SSE headers (Content-Type, Cache-Control, Connection) are already set by the framework's SSE handler
flushSse(sseSession)
streamsMapping[STANDALONE_SSE_STREAM_ID] = SessionContext(sseSession, call)
maybeSendPrimingEvent(STANDALONE_SSE_STREAM_ID, sseSession, call.request.header(MCP_PROTOCOL_VERSION_HEADER))
val clientProtocolVersion =
call.request.header(MCP_PROTOCOL_VERSION_HEADER) ?: DEFAULT_NEGOTIATED_PROTOCOL_VERSION
maybeSendPrimingEvent(STANDALONE_SSE_STREAM_ID, sseSession, clientProtocolVersion)
sseSession.coroutineContext.job.invokeOnCompletion {
streamsMapping.remove(STANDALONE_SSE_STREAM_ID)
}
Expand Down Expand Up @@ -568,9 +583,9 @@ public class StreamableHttpServerTransport(private val configuration: Configurat
return false
}

val sessionHeaderValues = call.request.headers.getAll(MCP_SESSION_ID_HEADER)
val headerId = call.request.header(MCP_SESSION_ID_HEADER)

if (sessionHeaderValues.isNullOrEmpty()) {
if (headerId == null) {
call.reject(
HttpStatusCode.BadRequest,
RPCError.ErrorCode.CONNECTION_CLOSED,
Expand All @@ -579,17 +594,6 @@ public class StreamableHttpServerTransport(private val configuration: Configurat
return false
}

Comment thread
devcrocod marked this conversation as resolved.
if (sessionHeaderValues.size > 1) {
call.reject(
HttpStatusCode.BadRequest,
RPCError.ErrorCode.CONNECTION_CLOSED,
"Bad Request: Mcp-Session-Id header must be a single value",
)
return false
}

val headerId = sessionHeaderValues.single()

return when (headerId) {
sessionId -> true

Expand All @@ -605,8 +609,7 @@ public class StreamableHttpServerTransport(private val configuration: Configurat
}

private suspend fun validateProtocolVersion(call: ApplicationCall): Boolean {
val protocolVersions = call.request.headers.getAll(MCP_PROTOCOL_VERSION_HEADER)
val version = protocolVersions?.lastOrNull() ?: DEFAULT_NEGOTIATED_PROTOCOL_VERSION
val version = call.request.headers[MCP_PROTOCOL_VERSION_HEADER] ?: return true

Comment thread
devcrocod marked this conversation as resolved.
return when (version) {
!in SUPPORTED_PROTOCOL_VERSIONS -> {
Expand Down Expand Up @@ -715,14 +718,14 @@ public class StreamableHttpServerTransport(private val configuration: Configurat
private suspend fun maybeSendPrimingEvent(
streamId: String,
session: ServerSSESession?,
clientProtocolVersion: String? = null,
clientProtocolVersion: String,
) {
val store = configuration.eventStore
if (store == null || session == null) return
// Priming events have empty data which older clients cannot handle.
// Only send priming events to clients with protocol version >= 2025-11-25
// which includes the fix for handling empty SSE data.
if (clientProtocolVersion != null && clientProtocolVersion < MIN_PRIMING_EVENT_PROTOCOL_VERSION) return
if (clientProtocolVersion < MIN_PRIMING_EVENT_PROTOCOL_VERSION) return
try {
val primingEventId = store.storeEvent(streamId, JSONRPCEmptyMessage)
session.send(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,31 @@ class StreamableHttpServerTransportTest {
secondResponse.status shouldBe HttpStatusCode.BadRequest
}

@Test
fun `init request with unsupported protocol version returns an HTTP error`() = testApplication {
configTestServer()

val client = createTestClient()

val transport = StreamableHttpServerTransport(enableJsonResponse = true)
transport.onMessage { message ->
if (message is JSONRPCRequest) {
transport.send(JSONRPCResponse(message.id, EmptyResult()))
}
}

configureTransportEndpoint(transport)

val initResponse = client.post(path) {
addStreamableHeaders()
header("mcp-protocol-version", "1900-01-01")
setBody(buildInitializeRequestPayload())
}

initResponse.status shouldBe HttpStatusCode.BadRequest
initResponse.headers[MCP_SESSION_ID_HEADER] shouldBe null
}

@Test
fun `request with unsupported protocol version returns an HTTP error`() = testApplication {
configTestServer()
Expand All @@ -191,18 +216,15 @@ class StreamableHttpServerTransportTest {

configureTransportEndpoint(transport)

val initPayload = buildInitializeRequestPayload()
val initResponse = client.post(path) {
addStreamableHeaders()
setBody(initPayload)
setBody(buildInitializeRequestPayload())
}

initResponse.status shouldBe HttpStatusCode.OK
val sessionId = initResponse.headers[MCP_SESSION_ID_HEADER]
assertNotNull(sessionId)

// TODO When https://github.com/modelcontextprotocol/kotlin-sdk/issues/547 is fixed,
// check the incompatible mcp-protocol-version in the InitializeRequest and delete the part below
val response = client.post(path) {
addStreamableHeaders()
header("mcp-session-id", sessionId)
Expand Down
Loading