Skip to content

Commit d57f165

Browse files
committed
Merge remote-tracking branch 'origin/master' into pr13254-resolve
# Conflicts: # apisix/plugins/ai-providers/base.lua
2 parents 31c74ce + ecbb6fe commit d57f165

12 files changed

Lines changed: 1561 additions & 478 deletions

File tree

apisix/discovery/consul/client.lua

Lines changed: 506 additions & 0 deletions
Large diffs are not rendered by default.

apisix/discovery/consul/init.lua

Lines changed: 373 additions & 466 deletions
Large diffs are not rendered by default.

apisix/plugins/ai-providers/base.lua

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ local log_sanitize = require("apisix.utils.log-sanitize")
3838
local protocols = require("apisix.plugins.ai-protocols")
3939
local ngx = ngx
4040
local ngx_now = ngx.now
41+
local tonumber = tonumber
4142

4243
local table = table
4344
local pairs = pairs
@@ -196,11 +197,70 @@ end
196197
-- using the client protocol module.
197198
-- @param client_proto table The protocol module for the client's protocol
198199
-- @param converter table|nil The converter module (if protocol conversion needed)
200+
-- @param conf table|nil Plugin configuration (used for response size limits)
199201
-- @return table|nil Parsed and optionally converted response body
200202
-- @return string|nil Error
201-
function _M.parse_response(self, ctx, res, client_proto, converter)
203+
function _M.parse_response(self, ctx, res, client_proto, converter, conf)
202204
local headers = res.headers
203-
local raw_res_body, err = res:read_body()
205+
206+
-- Pre-check Content-Length against max_response_bytes when the upstream
207+
-- advertises it. For responses without Content-Length (chunked), we read
208+
-- the body in bounded chunks below and enforce the cap incrementally.
209+
local max_bytes = conf and conf.max_response_bytes
210+
if max_bytes then
211+
local content_length = tonumber(headers["Content-Length"])
212+
if content_length and content_length > max_bytes then
213+
core.log.warn("aborting AI response: Content-Length ", content_length,
214+
" exceeds max_response_bytes ", max_bytes)
215+
if res._httpc then
216+
res._httpc:close()
217+
res._httpc = nil
218+
end
219+
return nil, "max_response_bytes exceeded", 502
220+
end
221+
end
222+
223+
local raw_res_body, err
224+
if max_bytes then
225+
-- Read in chunks so a runaway chunked upstream cannot force the
226+
-- worker to buffer arbitrarily many bytes before the cap trips.
227+
local body_reader = res.body_reader
228+
if not body_reader then
229+
-- Defensive: if no reader, fall back to read_body() and accept
230+
-- that the cap is only post-facto for this path.
231+
raw_res_body, err = res:read_body()
232+
else
233+
local parts = {}
234+
local total = 0
235+
while true do
236+
local chunk, read_err = body_reader()
237+
if read_err then
238+
err = read_err
239+
break
240+
end
241+
if not chunk then
242+
break
243+
end
244+
total = total + #chunk
245+
if total > max_bytes then
246+
core.log.warn("aborting AI response: body size exceeds",
247+
" max_response_bytes ", max_bytes,
248+
" (read ", total, " bytes)")
249+
if res._httpc then
250+
res._httpc:close()
251+
res._httpc = nil
252+
end
253+
return nil, "max_response_bytes exceeded", 502
254+
end
255+
parts[#parts + 1] = chunk
256+
end
257+
if not err then
258+
raw_res_body = table.concat(parts)
259+
end
260+
end
261+
else
262+
raw_res_body, err = res:read_body()
263+
end
204264
if not raw_res_body then
205265
core.log.warn("failed to read response body: ", err)
206266
return nil, err
@@ -261,7 +321,8 @@ end
261321
-- transforming events to client format.
262322
-- @param target_proto table The protocol module for the provider's native protocol
263323
-- @param converter table|nil The converter module (if protocol conversion needed)
264-
function _M.parse_streaming_response(self, ctx, res, target_proto, converter)
324+
-- @param conf table|nil Plugin configuration (used for stream duration and size limits)
325+
function _M.parse_streaming_response(self, ctx, res, target_proto, converter, conf)
265326
local body_reader = res.body_reader
266327
local contents = {}
267328
local sse_state = { is_first = true }
@@ -282,6 +343,15 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter)
282343
ctx.var.llm_request_done = true
283344
end
284345

346+
-- Runaway-upstream safeguards. Both are opt-in; unset means no cap.
347+
local max_duration_ms = conf and conf.max_stream_duration_ms
348+
local max_bytes = conf and conf.max_response_bytes
349+
local deadline
350+
if max_duration_ms then
351+
deadline = ctx.llm_request_start_time + max_duration_ms / 1000
352+
end
353+
local bytes_read = 0
354+
285355
while true do
286356
local chunk, err = body_reader()
287357
ctx.var.apisix_upstream_response_time = math.floor((ngx_now() -
@@ -306,6 +376,8 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter)
306376
return
307377
end
308378

379+
bytes_read = bytes_read + #chunk
380+
309381
if ctx.var.llm_time_to_first_token == "0" then
310382
ctx.var.llm_time_to_first_token = math.floor(
311383
(ngx_now() - ctx.llm_request_start_time) * 1000)
@@ -381,6 +453,45 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter)
381453
return
382454
end
383455
end
456+
457+
-- Enforce runaway-upstream safeguards after processing the chunk.
458+
-- Checked post-flush so clients still see any bytes we already emitted.
459+
local limit_hit
460+
if deadline and ngx_now() >= deadline then
461+
limit_hit = "max_stream_duration_ms"
462+
elseif max_bytes and bytes_read > max_bytes then
463+
limit_hit = "max_response_bytes"
464+
end
465+
if limit_hit then
466+
local duration_ms = math.floor((ngx_now() -
467+
ctx.llm_request_start_time) * 1000)
468+
core.log.warn("aborting AI stream: ", limit_hit, " exceeded;",
469+
" bytes=", bytes_read,
470+
" duration_ms=", duration_ms,
471+
" route_id=", ctx.var.route_id or "")
472+
-- Force-close upstream so we don't pool a half-drained connection.
473+
if res._httpc then
474+
res._httpc:close()
475+
res._httpc = nil
476+
end
477+
-- Signal downstream filters (e.g. moderation plugins that defer
478+
-- work until request completion) that no more content is coming.
479+
ctx.var.llm_request_done = true
480+
if output_sent then
481+
-- Client has already received partial SSE; stop feeding chunks.
482+
-- nginx will close the downstream connection at end of content
483+
-- phase. Clients detect incomplete responses via the absence
484+
-- of a protocol-specific terminator (e.g. OpenAI [DONE],
485+
-- Anthropic message_stop, Responses response.completed).
486+
return
487+
end
488+
-- No bytes flushed yet (e.g. converter skipped all events so far).
489+
-- Surface as 504 for duration (timeout-like) or 502 for size-limit
490+
-- (bad gateway response), so on_error / fallback policies can
491+
-- distinguish the failure modes.
492+
local status = limit_hit == "max_stream_duration_ms" and 504 or 502
493+
return status, limit_hit .. " exceeded"
494+
end
384495
end
385496
end
386497

apisix/plugins/ai-proxy/base.lua

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,13 +227,13 @@ function _M.before_proxy(conf, ctx, on_error)
227227
core.log.error("no protocol module for streaming target: ", target_proto)
228228
return 500
229229
end
230-
code = ai_provider:parse_streaming_response(
231-
ctx, res, target_proto_module, converter)
230+
code, body = ai_provider:parse_streaming_response(
231+
ctx, res, target_proto_module, converter, conf)
232232
else
233-
local _, parse_err = ai_provider:parse_response(
234-
ctx, res, client_proto, converter)
233+
local _, parse_err, parse_status = ai_provider:parse_response(
234+
ctx, res, client_proto, converter, conf)
235235
if parse_err then
236-
code = 500
236+
code = parse_status or 500
237237
body = parse_err
238238
end
239239
end

apisix/plugins/ai-proxy/schema.lua

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,22 @@ _M.ai_proxy_schema = {
183183
default = 30000,
184184
description = "timeout in milliseconds",
185185
},
186+
max_stream_duration_ms = {
187+
type = "integer",
188+
minimum = 1,
189+
description = "Maximum wall-clock duration (in milliseconds) for a "
190+
.. "streaming AI response. If the upstream keeps sending "
191+
.. "data past this deadline, the connection is closed. "
192+
.. "Unset means no cap. Use this to protect the gateway "
193+
.. "from upstream bugs that produce tokens indefinitely.",
194+
},
195+
max_response_bytes = {
196+
type = "integer",
197+
minimum = 1,
198+
description = "Maximum total bytes read from the upstream for a "
199+
.. "single AI response (streaming or non-streaming). If "
200+
.. "exceeded, the connection is closed. Unset means no cap.",
201+
},
186202
keepalive = {type = "boolean", default = true},
187203
keepalive_timeout = {
188204
type = "integer",
@@ -258,6 +274,22 @@ _M.ai_proxy_multi_schema = {
258274
default = 30000,
259275
description = "timeout in milliseconds",
260276
},
277+
max_stream_duration_ms = {
278+
type = "integer",
279+
minimum = 1,
280+
description = "Maximum wall-clock duration (in milliseconds) for a "
281+
.. "streaming AI response. If the upstream keeps sending "
282+
.. "data past this deadline, the connection is closed. "
283+
.. "Unset means no cap. Use this to protect the gateway "
284+
.. "from upstream bugs that produce tokens indefinitely.",
285+
},
286+
max_response_bytes = {
287+
type = "integer",
288+
minimum = 1,
289+
description = "Maximum total bytes read from the upstream for a "
290+
.. "single AI response (streaming or non-streaming). If "
291+
.. "exceeded, the connection is closed. Unset means no cap.",
292+
},
261293
keepalive = {type = "boolean", default = true},
262294
keepalive_timeout = {
263295
type = "integer",

docs/en/latest/plugins/ai-proxy-multi.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ In addition, the Plugin also supports logging LLM request information in the acc
9999
| instances.checks.active.unhealthy.http_statuses | array[integer] | False | [429,404,500,501,502,503,504,505] | status code between 200 and 599 inclusive | An array of HTTP status codes that defines an unhealthy node. |
100100
| instances.checks.active.unhealthy.http_failures | integer | False | 5 | between 1 and 254 inclusive | Number of HTTP failures to define an unhealthy node. |
101101
| instances.checks.active.unhealthy.timeout | integer | False | 3 | between 1 and 254 inclusive | Number of probe timeouts to define an unhealthy node. |
102-
| timeout | integer | False | 30000 | greater than or equal to 1 | Request timeout in milliseconds when requesting the LLM service. |
102+
| timeout | integer | False | 30000 | greater than or equal to 1 | Request timeout in milliseconds when requesting the LLM service. Applied per socket operation (connect / send / read block); does not cap the total duration of a streaming response. |
103+
| max_stream_duration_ms | integer | False | | greater than or equal to 1 | Maximum wall-clock duration (in milliseconds) for a streaming AI response. If the upstream keeps sending data past this deadline, the gateway closes the connection. Unset means no cap. Use this to protect the gateway from upstream bugs that produce tokens indefinitely. When the limit is hit mid-stream, the downstream SSE stream is truncated (no protocol-specific terminator such as `[DONE]`, `message_stop`, or `response.completed`); well-behaved clients should treat a missing terminator as an incomplete response. |
104+
| max_response_bytes | integer | False | | greater than or equal to 1 | Maximum total bytes read from the upstream for a single AI response (streaming or non-streaming). If exceeded, the gateway closes the connection. For non-streaming responses with `Content-Length`, the check is performed before reading the body; for chunked (no-`Content-Length`) non-streaming responses and for streaming responses, the cap is enforced incrementally as bytes are received. Unset means no cap. |
103105
| keepalive | boolean | False | true | | If true, keep the connection alive when requesting the LLM service. |
104106
| keepalive_timeout | integer | False | 60000 | greater than or equal to 1000 | Request timeout in milliseconds when requesting the LLM service. |
105107
| keepalive_pool | integer | False | 30 | | Keepalive pool size for when connecting with the LLM service. |

docs/en/latest/plugins/ai-proxy.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ In addition, the Plugin also supports logging LLM request information in the acc
6969
| logging | object | False | | | Logging configurations. Does not affect `error.log`. |
7070
| logging.summaries | boolean | False | false | | If true, logs request LLM model, duration, request, and response tokens. |
7171
| logging.payloads | boolean | False | false | | If true, logs request and response payload. |
72-
| timeout | integer | False | 30000 | ≥ 1 | Request timeout in milliseconds when requesting the LLM service. |
72+
| timeout | integer | False | 30000 | ≥ 1 | Request timeout in milliseconds when requesting the LLM service. Applied per socket operation (connect / send / read block); does not cap the total duration of a streaming response. |
73+
| max_stream_duration_ms | integer | False | | ≥ 1 | Maximum wall-clock duration (in milliseconds) for a streaming AI response. If the upstream keeps sending data past this deadline, the gateway closes the connection. Unset means no cap. Use this to protect the gateway from upstream bugs that produce tokens indefinitely. When the limit is hit mid-stream, the downstream SSE stream is truncated (no protocol-specific terminator such as `[DONE]`, `message_stop`, or `response.completed`); well-behaved clients should treat a missing terminator as an incomplete response. |
74+
| max_response_bytes | integer | False | | ≥ 1 | Maximum total bytes read from the upstream for a single AI response (streaming or non-streaming). If exceeded, the gateway closes the connection. For non-streaming responses with `Content-Length`, the check is performed before reading the body; for chunked (no-`Content-Length`) non-streaming responses and for streaming responses, the cap is enforced incrementally as bytes are received. Unset means no cap. |
7375
| keepalive | boolean | False | true | | If true, keeps the connection alive when requesting the LLM service. |
7476
| keepalive_timeout | integer | False | 60000 | ≥ 1000 | Keepalive timeout in milliseconds when connecting to the LLM service. |
7577
| keepalive_pool | integer | False | 30 | | Keepalive pool size for the LLM service connection. |

docs/zh/latest/plugins/ai-proxy-multi.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,9 @@ import TabItem from '@theme/TabItem';
101101
| instances.checks.active.unhealthy.http_statuses | array[integer] || [429,404,500,501,502,503,504,505] | 200 到 599 之间的状态码(包含) | 定义不健康节点的 HTTP 状态码数组。 |
102102
| instances.checks.active.unhealthy.http_failures | integer || 5 | 1 到 254(包含) | 定义不健康节点的 HTTP 失败次数。 |
103103
| instances.checks.active.unhealthy.timeout | integer || 3 | 1 到 254(包含) | 定义不健康节点的探测超时次数。 |
104-
| timeout | integer || 30000 | 大于或等于 1 | 请求 LLM 服务时的请求超时时间(毫秒)。 |
104+
| timeout | integer || 30000 | 大于或等于 1 | 请求 LLM 服务时的请求超时时间(毫秒)。应用于单次 socket 操作(连接 / 发送 / 读取块),不限制流式响应的总时长。 |
105+
| max_stream_duration_ms | integer || | 大于或等于 1 | 流式 AI 响应的总墙钟时长上限(毫秒)。若上游在此时间后仍持续发送数据,网关将关闭连接。未设置时不限制。用于防护上游持续输出 token 导致网关 CPU 被打满的异常情况。中途触发上限时,下游 SSE 流会被截断(不再发送协议特定的终止标记,例如 `[DONE]``message_stop``response.completed`),客户端应将缺失的终止标记视为响应未完成。 |
106+
| max_response_bytes | integer || | 大于或等于 1 | 单次 AI 响应(流式或非流式)允许从上游读取的最大总字节数。超出时关闭连接。非流式响应若存在 `Content-Length`,在读取 body 之前预检;否则(chunked 传输)与流式响应一样在接收字节的过程中增量检查。未设置时不限制。 |
105107
| keepalive | boolean || true | | 如果为 true,在请求 LLM 服务时保持连接活跃。 |
106108
| keepalive_timeout | integer || 60000 | 大于或等于 1000 | 请求 LLM 服务时的请求超时时间(毫秒)。 |
107109
| keepalive_pool | integer || 30 | | 连接 LLM 服务时的保活池大小。 |

0 commit comments

Comments
 (0)