Skip to content

Commit 2c23e82

Browse files
authored
Merge pull request #12 from Deep-CodeAI/fix/http-resource-limits
Fix/http resource limits
2 parents 280fc89 + c0e10a1 commit 2c23e82

6 files changed

Lines changed: 287 additions & 10 deletions

File tree

src/main/kotlin/agents_engine/mcp/HttpMcpTransport.kt

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import java.net.URI
44
import java.net.http.HttpClient
55
import java.net.http.HttpRequest
66
import java.net.http.HttpResponse
7+
import kotlin.time.Duration
8+
import kotlin.time.Duration.Companion.seconds
9+
import kotlin.time.toJavaDuration
710

811
/**
912
* Streamable HTTP transport. Each `rpc()` is a POST whose response is either a JSON body
@@ -13,6 +16,8 @@ import java.net.http.HttpResponse
1316
internal class HttpMcpTransport(
1417
private val url: String,
1518
private val auth: McpAuth = McpAuth.None,
19+
private val requestTimeout: Duration = DEFAULT_REQUEST_TIMEOUT,
20+
private val maxResponseBytes: Long = DEFAULT_MAX_RESPONSE_BYTES,
1621
) : McpTransport {
1722

1823
private var sessionId: String? = null
@@ -28,20 +33,28 @@ internal class HttpMcpTransport(
2833
.uri(URI.create(url))
2934
.header("Content-Type", "application/json")
3035
.header("Accept", "application/json, text/event-stream")
36+
.timeout(requestTimeout.toJavaDuration())
3137
.also { if (sessionId != null) it.header("Mcp-Session-Id", sessionId!!) }
3238
.also { applyAuth(it) }
3339
.POST(HttpRequest.BodyPublishers.ofString(envelope))
34-
val response = http.send(builder.build(), HttpResponse.BodyHandlers.ofString())
40+
// #853 — bounded read so a malicious upstream MCP server can't OOM us.
41+
val response = http.send(builder.build(), HttpResponse.BodyHandlers.ofInputStream())
42+
val cap = maxResponseBytes.coerceAtMost(Int.MAX_VALUE.toLong()).toInt()
43+
val bytes = response.body().use { it.readNBytes(cap + 1) }
44+
if (bytes.size > cap) {
45+
error("MCP response exceeded $maxResponseBytes bytes; aborting to prevent OOM")
46+
}
47+
val bodyStr = String(bytes, Charsets.UTF_8)
3548
if (response.statusCode() !in 200..299) {
36-
error("MCP HTTP ${response.statusCode()}: ${response.body()}")
49+
error("MCP HTTP ${response.statusCode()}: $bodyStr")
3750
}
3851
response.headers().firstValue("mcp-session-id").ifPresent { sessionId = it }
3952
if (!expectBody) return ""
4053
val ct = response.headers().firstValue("content-type").orElse("")
4154
return if (ct.startsWith("text/event-stream")) {
42-
extractSseJson(response.body())
43-
?: error("MCP SSE response had no JSON data event: ${response.body()}")
44-
} else response.body()
55+
extractSseJson(bodyStr)
56+
?: error("MCP SSE response had no JSON data event: $bodyStr")
57+
} else bodyStr
4558
}
4659

4760
private fun applyAuth(builder: HttpRequest.Builder) {
@@ -52,7 +65,16 @@ internal class HttpMcpTransport(
5265
}
5366

5467
companion object {
55-
private val http: HttpClient = HttpClient.newHttpClient()
68+
// See #852.
69+
val DEFAULT_REQUEST_TIMEOUT: Duration = 60.seconds
70+
val DEFAULT_CONNECT_TIMEOUT: Duration = 10.seconds
71+
72+
// 8 MiB — MCP responses are typically small JSON-RPC envelopes. See #853.
73+
const val DEFAULT_MAX_RESPONSE_BYTES: Long = 8L * 1024 * 1024
74+
75+
private val http: HttpClient = HttpClient.newBuilder()
76+
.connectTimeout(DEFAULT_CONNECT_TIMEOUT.toJavaDuration())
77+
.build()
5678

5779
private fun extractSseJson(body: String): String? =
5880
body.lineSequence()

src/main/kotlin/agents_engine/mcp/McpServer.kt

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class McpServer private constructor(
3333
private val agent: Agent<*, *>,
3434
private val exposedSkills: List<ExposedSkill>,
3535
private val portRequest: Int,
36+
private val maxRequestBytes: Long = DEFAULT_MAX_REQUEST_BYTES,
3637
) {
3738
private var http: HttpServer? = null
3839
private val sessionId: String = java.util.UUID.randomUUID().toString()
@@ -66,7 +67,21 @@ class McpServer private constructor(
6667
respond(exchange, 415, """{"error":"Unsupported Media Type — expected application/json"}""")
6768
return
6869
}
69-
val bodyText = exchange.requestBody.bufferedReader().use { it.readText() }
70+
// #851 — bound the request body before reading. Honors Content-Length when
71+
// present; falls back to a length-bounded read otherwise. Avoids OOM from
72+
// a same-host process posting a multi-GB body to the loopback server.
73+
val declaredLength = exchange.requestHeaders.getFirst("Content-Length")?.toLongOrNull()
74+
if (declaredLength != null && declaredLength > maxRequestBytes) {
75+
respond(exchange, 413, """{"error":"Payload Too Large — limit is $maxRequestBytes bytes"}""")
76+
return
77+
}
78+
val cap = maxRequestBytes.coerceAtMost(Int.MAX_VALUE.toLong()).toInt()
79+
val bodyBytes = exchange.requestBody.use { it.readNBytes(cap + 1) }
80+
if (bodyBytes.size > cap) {
81+
respond(exchange, 413, """{"error":"Payload Too Large — limit is $maxRequestBytes bytes"}""")
82+
return
83+
}
84+
val bodyText = String(bodyBytes, Charsets.UTF_8)
7085
val request = LenientJsonParser.parse(bodyText) as? Map<*, *>
7186
?: return respond(exchange, 400, "{}")
7287
val method = request["method"] as? String ?: return respond(exchange, 400, "{}")
@@ -150,6 +165,10 @@ class McpServer private constructor(
150165
}
151166

152167
companion object {
168+
// 8 MiB — generous for tools/call payloads, far short of OOM on a typical
169+
// JVM heap. See #851.
170+
const val DEFAULT_MAX_REQUEST_BYTES: Long = 8L * 1024 * 1024
171+
153172
fun from(agent: Agent<*, *>, block: McpExposeBuilder.() -> Unit): McpServer {
154173
val builder = McpExposeBuilder().apply(block)
155174
require(builder.exposedNames.isNotEmpty()) {
@@ -165,13 +184,15 @@ class McpServer private constructor(
165184
}
166185
ExposedSkill.of(skill)
167186
}
168-
return McpServer(agent, exposed, builder.port)
187+
return McpServer(agent, exposed, builder.port, builder.maxRequestBytes)
169188
}
170189
}
171190
}
172191

173192
class McpExposeBuilder internal constructor() {
174193
var port: Int = 0 // 0 = auto-assign
194+
/** Hard cap on inbound request body size. See #851. */
195+
var maxRequestBytes: Long = McpServer.DEFAULT_MAX_REQUEST_BYTES
175196
internal val exposedNames = mutableListOf<String>()
176197

177198
fun expose(skillName: String) { exposedNames += skillName }

src/main/kotlin/agents_engine/model/OllamaClient.kt

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import java.net.URI
66
import java.net.http.HttpClient
77
import java.net.http.HttpRequest
88
import java.net.http.HttpResponse
9+
import kotlin.time.Duration
10+
import kotlin.time.Duration.Companion.seconds
11+
import kotlin.time.toJavaDuration
912

1013
internal data class ParsedToolArguments(
1114
val arguments: Map<String, Any?>,
@@ -55,10 +58,18 @@ open class OllamaClient(
5558
private val model: String,
5659
private val temperature: Double = 0.7,
5760
private val tools: List<ToolDef> = emptyList(),
61+
/** Per-request wall-clock cap. See #852. */
62+
private val requestTimeout: Duration = DEFAULT_REQUEST_TIMEOUT,
63+
/** TCP connect timeout for the underlying HttpClient. See #852. */
64+
private val connectTimeout: Duration = DEFAULT_CONNECT_TIMEOUT,
65+
/** Hard cap on response body size — anything bigger throws. See #853. */
66+
private val maxResponseBytes: Long = DEFAULT_MAX_RESPONSE_BYTES,
5867
) : ModelClient {
5968
private val baseUrl = "http://$host:$port"
6069

61-
private val http = HttpClient.newHttpClient()
70+
private val http: HttpClient = HttpClient.newBuilder()
71+
.connectTimeout(connectTimeout.toJavaDuration())
72+
.build()
6273

6374
/**
6475
* #706: Once a model has been observed to reject native tools, skip the native
@@ -99,9 +110,33 @@ open class OllamaClient(
99110
val request = HttpRequest.newBuilder()
100111
.uri(URI.create("$baseUrl/api/chat"))
101112
.header("Content-Type", "application/json")
113+
.timeout(requestTimeout.toJavaDuration())
102114
.POST(HttpRequest.BodyPublishers.ofString(body))
103115
.build()
104-
return http.send(request, HttpResponse.BodyHandlers.ofString()).body()
116+
// #853 — bounded read so a malicious or buggy upstream can't OOM us.
117+
val response = http.send(request, HttpResponse.BodyHandlers.ofInputStream())
118+
val cap = maxResponseBytes.coerceAtMost(Int.MAX_VALUE.toLong()).toInt()
119+
val bytes = response.body().use { it.readNBytes(cap + 1) }
120+
if (bytes.size > cap) {
121+
throw LlmProviderException(
122+
"Ollama response exceeded $maxResponseBytes bytes; aborting to prevent OOM",
123+
)
124+
}
125+
return String(bytes, Charsets.UTF_8)
126+
}
127+
128+
companion object {
129+
// 60s — chat completions can be slow; large enough not to false-trip on
130+
// legitimate long responses, small enough to bound a hung Ollama instance.
131+
// See #852.
132+
val DEFAULT_REQUEST_TIMEOUT: Duration = 60.seconds
133+
134+
// 10s — TCP connect should never take this long on a healthy network.
135+
val DEFAULT_CONNECT_TIMEOUT: Duration = 10.seconds
136+
137+
// 16 MiB — LLM responses can be large but not THAT large; cap keeps OOM
138+
// off the table when the upstream is malicious or buggy. See #853.
139+
const val DEFAULT_MAX_RESPONSE_BYTES: Long = 16L * 1024 * 1024
105140
}
106141

107142
private fun isNativeToolCapabilityError(msg: String?): Boolean =
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package agents_engine.mcp
2+
3+
import agents_engine.core.agent
4+
import java.net.URI
5+
import java.net.http.HttpClient
6+
import java.net.http.HttpRequest
7+
import java.net.http.HttpResponse
8+
import kotlin.test.AfterTest
9+
import kotlin.test.Test
10+
import kotlin.test.assertEquals
11+
import kotlin.test.assertTrue
12+
13+
// Tests for #851 — McpServer caps request body size and returns 413 before
14+
// loading the entire body into memory.
15+
class McpServerBodySizeLimitTest {
16+
17+
private val toStop = mutableListOf<() -> Unit>()
18+
19+
@AfterTest fun cleanup() { toStop.forEach { runCatching { it() } } }
20+
21+
private fun trivialAgent() = agent<String, String>("greeter") {
22+
skills { skill<String, String>("greet", "Greets") { implementedBy { "hi $it" } } }
23+
}
24+
25+
private fun startServer(maxBytes: Long = McpServer.DEFAULT_MAX_REQUEST_BYTES): McpServer {
26+
val server = McpServer.from(trivialAgent()) {
27+
expose("greet")
28+
port = 0
29+
maxRequestBytes = maxBytes
30+
}.start()
31+
toStop.add { server.stop() }
32+
return server
33+
}
34+
35+
private fun postRaw(url: String, body: String, contentLength: Long? = null): HttpResponse<String> {
36+
val builder = HttpRequest.newBuilder()
37+
.uri(URI.create(url))
38+
.header("Content-Type", "application/json")
39+
.POST(HttpRequest.BodyPublishers.ofString(body))
40+
if (contentLength != null) {
41+
// The JDK HttpClient sets Content-Length automatically; this branch is here
42+
// for explicit-override tests if the framework ever needs them.
43+
builder.setHeader("Content-Length", contentLength.toString())
44+
}
45+
return HttpClient.newHttpClient().send(builder.build(), HttpResponse.BodyHandlers.ofString())
46+
}
47+
48+
@Test
49+
fun `request larger than the cap is rejected with 413`() {
50+
val server = startServer(maxBytes = 1024) // 1 KiB cap for the test
51+
val tooBig = "x".repeat(2048) // 2 KiB
52+
val response = postRaw(server.url, """{"jsonrpc":"2.0","id":1,"method":"ping","params":{"pad":"$tooBig"}}""")
53+
assertEquals(413, response.statusCode())
54+
assertTrue(
55+
response.body().contains("Payload Too Large", ignoreCase = true),
56+
"expected error body to mention size limit; got: ${response.body()}",
57+
)
58+
}
59+
60+
@Test
61+
fun `request exactly at the cap is processed normally`() {
62+
val server = startServer(maxBytes = 16 * 1024) // 16 KiB cap
63+
// Build a request that's well under the cap but uses real JSON-RPC shape.
64+
val response = postRaw(server.url, """{"jsonrpc":"2.0","id":1,"method":"ping"}""")
65+
assertEquals(200, response.statusCode())
66+
assertTrue(response.body().contains("\"result\""), "expected success: ${response.body()}")
67+
}
68+
69+
@Test
70+
fun `default cap accepts a normal-sized tools-list request`() {
71+
// Sanity: the default cap doesn't break ordinary traffic.
72+
val server = startServer() // default
73+
val response = postRaw(server.url, """{"jsonrpc":"2.0","id":1,"method":"tools/list"}""")
74+
assertEquals(200, response.statusCode())
75+
}
76+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package agents_engine.model
2+
3+
import com.sun.net.httpserver.HttpServer
4+
import java.net.InetSocketAddress
5+
import kotlin.test.AfterTest
6+
import kotlin.test.Test
7+
import kotlin.test.assertEquals
8+
import kotlin.test.assertTrue
9+
import kotlin.test.fail
10+
11+
// Tests for #853 — OllamaClient enforces a hard cap on inbound response body size.
12+
// A malicious or buggy upstream that streams unbounded bytes must not OOM the JVM.
13+
class OllamaClientResponseSizeLimitTest {
14+
15+
private val toStop = mutableListOf<() -> Unit>()
16+
17+
@AfterTest fun cleanup() { toStop.forEach { runCatching { it() } } }
18+
19+
/** Starts a stub server that returns the given bytes for any /api/chat POST. */
20+
private fun startStub(responseBytes: ByteArray): Int {
21+
val server = HttpServer.create(InetSocketAddress("127.0.0.1", 0), 0)
22+
server.createContext("/api/chat") { ex ->
23+
ex.responseHeaders.add("Content-Type", "application/json")
24+
ex.sendResponseHeaders(200, responseBytes.size.toLong())
25+
ex.responseBody.use { it.write(responseBytes) }
26+
}
27+
server.executor = null
28+
server.start()
29+
toStop.add { server.stop(0) }
30+
return server.address.port
31+
}
32+
33+
@Test
34+
fun `response above maxResponseBytes throws LlmProviderException`() {
35+
// Stub returns 5 KiB; client cap is 1 KiB.
36+
val port = startStub(ByteArray(5 * 1024) { 'x'.code.toByte() })
37+
val client = OllamaClient(
38+
host = "127.0.0.1",
39+
port = port,
40+
model = "fake",
41+
maxResponseBytes = 1024,
42+
)
43+
try {
44+
client.chat(listOf(LlmMessage("user", "hi")))
45+
fail("expected LlmProviderException for over-cap response")
46+
} catch (e: LlmProviderException) {
47+
assertTrue(
48+
e.message!!.contains("exceeded", ignoreCase = true) ||
49+
e.message!!.contains("OOM", ignoreCase = true),
50+
"expected size-limit message, got: ${e.message}",
51+
)
52+
}
53+
}
54+
55+
@Test
56+
fun `response at the cap is processed normally`() {
57+
val body = """{"message":{"content":"ok"}}""".toByteArray(Charsets.UTF_8)
58+
val port = startStub(body)
59+
val client = OllamaClient(
60+
host = "127.0.0.1",
61+
port = port,
62+
model = "fake",
63+
maxResponseBytes = 4096,
64+
)
65+
val response = client.chat(listOf(LlmMessage("user", "hi")))
66+
assertTrue(response is LlmResponse.Text, "expected text response, got: $response")
67+
assertEquals("ok", (response as LlmResponse.Text).content)
68+
}
69+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package agents_engine.model
2+
3+
import com.sun.net.httpserver.HttpServer
4+
import java.net.InetSocketAddress
5+
import java.net.http.HttpTimeoutException
6+
import kotlin.test.AfterTest
7+
import kotlin.test.Test
8+
import kotlin.test.assertTrue
9+
import kotlin.test.fail
10+
import kotlin.time.Duration.Companion.milliseconds
11+
12+
// Tests for #852 — OllamaClient enforces a per-request HTTP timeout.
13+
// A non-responsive endpoint must NOT block the caller indefinitely.
14+
class OllamaClientTimeoutTest {
15+
16+
private val toStop = mutableListOf<() -> Unit>()
17+
18+
@AfterTest fun cleanup() { toStop.forEach { runCatching { it() } } }
19+
20+
/** Starts a stub HTTP server that accepts the connection and never replies. */
21+
private fun startBlackHole(): Int {
22+
val server = HttpServer.create(InetSocketAddress("127.0.0.1", 0), 0)
23+
server.createContext("/api/chat") { /* never write a response — exchange leaks */ }
24+
server.executor = null
25+
server.start()
26+
toStop.add { server.stop(0) }
27+
return server.address.port
28+
}
29+
30+
@Test
31+
fun `request that exceeds requestTimeout throws HttpTimeoutException`() {
32+
val port = startBlackHole()
33+
val client = OllamaClient(
34+
host = "127.0.0.1",
35+
port = port,
36+
model = "fake",
37+
requestTimeout = 250.milliseconds,
38+
)
39+
40+
val started = System.nanoTime()
41+
try {
42+
client.chat(listOf(LlmMessage("user", "hi")))
43+
fail("expected HttpTimeoutException; chat returned normally")
44+
} catch (e: HttpTimeoutException) {
45+
val elapsedMs = (System.nanoTime() - started) / 1_000_000
46+
assertTrue(
47+
elapsedMs in 200..2_000,
48+
"timeout fired but elapsed=${elapsedMs}ms, expected ~250ms (sanity bound)",
49+
)
50+
} catch (e: Throwable) {
51+
fail("expected HttpTimeoutException, got ${e::class.simpleName}: ${e.message}")
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)