Skip to content

Commit 8b73bc8

Browse files
committed
perf(http_proxy): stream the response to avoid memory bloat
1 parent 3a2643e commit 8b73bc8

File tree

4 files changed

+241
-43
lines changed

4 files changed

+241
-43
lines changed

gateway/src/apicast/http_proxy.lua

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ local file_reader = require("resty.file").file_reader
1313
local file_size = require("resty.file").file_size
1414
local client_body_reader = require("resty.http.request_reader").get_client_body_reader
1515
local send_response = require("resty.http.response_writer").send_response
16-
local concat = table.concat
16+
local proxy_response = require("resty.http.response_writer").proxy_response
1717

1818
local _M = { }
1919

@@ -163,13 +163,18 @@ local function forward_https_request(proxy_uri, uri, proxy_opts)
163163

164164
if res then
165165
if opts.request_unbuffered and raw then
166-
local bytes, err = send_response(sock, res, DEFAULT_CHUNKSIZE)
167-
if not bytes then
166+
err = send_response(sock, res, DEFAULT_CHUNKSIZE)
167+
if err then
168168
ngx.log(ngx.ERR, "failed to send response: ", err)
169-
return sock:send("HTTP/1.1 502 Bad Gateway")
169+
sock:close()
170+
return ngx.exit(ngx.HTTP_BAD_GATEWAY)
170171
end
171172
else
172-
httpc:proxy_response(res)
173+
err = proxy_response(res, DEFAULT_CHUNKSIZE)
174+
if err then
175+
ngx.log(ngx.ERR, 'failed to proxy request to: ', proxy_uri, ' err : ', err)
176+
return ngx.exit(ngx.HTTP_BAD_GATEWAY)
177+
end
173178
httpc:set_keepalive()
174179
end
175180
else

gateway/src/resty/http/response_writer.lua

Lines changed: 62 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ local str_lower = string.lower
33
local insert = table.insert
44
local concat = table.concat
55

6+
local ngx = ngx
7+
68
local _M = {
79
}
810

@@ -31,6 +33,45 @@ local function send(socket, data)
3133
return socket:send(data)
3234
end
3335

36+
local function send_chunk(chunk)
37+
if not chunk then
38+
return nil
39+
end
40+
41+
local ok, err = ngx.print(chunk)
42+
if not ok then
43+
return "output response failed: " .. (err or "")
44+
end
45+
46+
return nil
47+
end
48+
49+
-- forward_body reads chunks from a body_reader and passes them to the callback
50+
-- function cb.
51+
-- cb(chunk) should return a true on success, or nil/false, err on failure.
52+
local function forward_body(reader, cb, chunksize)
53+
if not reader then
54+
return "no body reader"
55+
end
56+
57+
local buffer_size = chunksize or 65536
58+
59+
repeat
60+
local buffer, read_err, send_err
61+
buffer, read_err = reader(buffer_size)
62+
if read_err then
63+
return "failed to read response body: " .. read_err
64+
end
65+
66+
if buffer then
67+
send_err = cb(buffer)
68+
if send_err then
69+
return "failed to send response body: " .. (send_err or "unknown")
70+
end
71+
end
72+
until not buffer
73+
end
74+
3475
-- write_response writes response body reader to sock in the HTTP/1.x server response format,
3576
-- The connection is closed if send() fails or when returning a non-zero
3677
function _M.send_response(sock, response, chunksize)
@@ -42,7 +83,7 @@ function _M.send_response(sock, response, chunksize)
4283
end
4384

4485
if not sock then
45-
return nil, "socket not initialized yet"
86+
return "socket not initialized yet"
4687
end
4788

4889
-- Build status line + headers into a single buffer to minimize send() calls
@@ -62,32 +103,31 @@ function _M.send_response(sock, response, chunksize)
62103

63104
local bytes, err = sock:send(concat(buf))
64105
if not bytes then
65-
return nil, "failed to send headers, err: " .. (err or "unknown")
66-
end
67-
68-
-- Write body
69-
local reader = response.body_reader
70-
if not reader then
71-
return nil, "no body reader"
106+
return "failed to send headers, err: " .. (err or "unknown")
72107
end
73108

74-
repeat
75-
local chunk, read_err
76-
77-
chunk, read_err = reader(chunksize)
78-
if read_err then
79-
return nil, "failed to read response body, err: " .. (err or "unknown")
109+
return forward_body(response.body_reader, function(chunk)
110+
bytes, err = send(sock, chunk)
111+
if not bytes then
112+
return "failed to send response body, err: " .. (err or "unknown")
80113
end
114+
end, chunksize)
115+
end
81116

82-
if chunk then
83-
bytes, err = send(sock, chunk)
84-
if not bytes then
85-
return nil, "failed to send response body, err: " .. (err or "unknown")
86-
end
87-
end
88-
until not chunk
117+
function _M.proxy_response(res, chunksize)
118+
if not res then
119+
ngx.log(ngx.ERR, "no response provided")
120+
return
121+
end
122+
123+
ngx.status = res.status
124+
for k, v in pairs(res.headers) do
125+
if not HOP_BY_HOP_HEADERS[str_lower(k)] then
126+
ngx.header[k] = v
127+
end
128+
end
89129

90-
return true, nil
130+
return forward_body(res.body_reader, send_chunk, chunksize)
91131
end
92132

93133
return _M

spec/http_proxy_spec.lua

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ describe('http_proxy', function()
2020

2121
local resty_http_proxy = require 'resty.http.proxy'
2222
stub(resty_http_proxy, 'new', function() return httpc end)
23+
local http_writer = require 'resty.http.response_writer'
24+
stub(http_writer, 'proxy_response')
2325
end
2426

2527
before_each(function()

spec/resty/http/response_writer_spec.lua

Lines changed: 167 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,12 @@ describe('resty.http.response_writer', function()
3636
describe('.send_response', function()
3737

3838
it('returns nil when no response is provided', function()
39-
local ok, err = response_writer.send_response(mock_sock, nil)
40-
assert.is_nil(ok)
39+
local err = response_writer.send_response(mock_sock, nil)
4140
assert.is_nil(err)
4241
end)
4342

4443
it('returns error when no socket is provided', function()
45-
local ok, err = response_writer.send_response(nil, make_response())
46-
assert.is_nil(ok)
44+
local err = response_writer.send_response(nil, make_response())
4745
assert.equal("socket not initialized yet", err)
4846
end)
4947

@@ -145,18 +143,16 @@ describe('resty.http.response_writer', function()
145143

146144
it('returns true on success', function()
147145
local response = make_response()
148-
local ok, err = response_writer.send_response(mock_sock, response)
146+
local err = response_writer.send_response(mock_sock, response)
149147

150-
assert.is_true(ok)
151148
assert.is_nil(err)
152149
end)
153150

154151
it('returns error when headers send fails', function()
155152
mock_sock.send = function() return nil, "closed" end
156153
local response = make_response()
157-
local ok, err = response_writer.send_response(mock_sock, response)
154+
local err = response_writer.send_response(mock_sock, response)
158155

159-
assert.is_nil(ok)
160156
assert.truthy(string.find(err, "failed to send headers"))
161157
end)
162158

@@ -171,9 +167,8 @@ describe('resty.http.response_writer', function()
171167
end
172168

173169
local response = make_response({ chunks = { "hello" } })
174-
local ok, err = response_writer.send_response(mock_sock, response)
170+
local err = response_writer.send_response(mock_sock, response)
175171

176-
assert.is_nil(ok)
177172
assert.truthy(string.find(err, "failed to send response body"))
178173
end)
179174

@@ -186,9 +181,8 @@ describe('resty.http.response_writer', function()
186181
return nil, "read error"
187182
end
188183
}
189-
local ok, err = response_writer.send_response(mock_sock, response)
184+
local err = response_writer.send_response(mock_sock, response)
190185

191-
assert.is_nil(ok)
192186
assert.truthy(string.find(err, "failed to read response body"))
193187
end)
194188

@@ -198,9 +192,8 @@ describe('resty.http.response_writer', function()
198192
reason = "OK",
199193
headers = {},
200194
}
201-
local ok, err = response_writer.send_response(mock_sock, response)
195+
local err = response_writer.send_response(mock_sock, response)
202196

203-
assert.is_nil(ok)
204197
assert.equal("no body reader", err)
205198
end)
206199

@@ -238,10 +231,168 @@ describe('resty.http.response_writer', function()
238231

239232
it('handles empty body', function()
240233
local response = make_response({ chunks = {} })
241-
local ok, err = response_writer.send_response(mock_sock, response)
234+
local err = response_writer.send_response(mock_sock, response)
235+
236+
assert.is_nil(err)
237+
end)
238+
end)
239+
240+
describe('.proxy_response', function()
241+
local printed_data
242+
local headers_set
243+
244+
before_each(function()
245+
printed_data = {}
246+
headers_set = {}
247+
248+
ngx.status = nil
249+
ngx.header = setmetatable({}, {
250+
__newindex = function(_, k, v)
251+
headers_set[k] = v
252+
end
253+
})
254+
255+
stub(ngx, 'print', function(data)
256+
table.insert(printed_data, data)
257+
return true
258+
end)
259+
stub(ngx, 'flush', function() return true end)
260+
end)
261+
262+
it('returns nil when no response is provided', function()
263+
local ok = response_writer.proxy_response(nil)
264+
assert.is_nil(ok)
265+
end)
266+
267+
it('sets ngx.status from response', function()
268+
local res = make_response({ status = 201 })
269+
local err = response_writer.proxy_response(res)
270+
271+
assert.is_nil(err)
272+
assert.equal(201, ngx.status)
273+
end)
274+
275+
it('sets response headers on ngx.header', function()
276+
local res = make_response({
277+
headers = {
278+
["Content-Type"] = "application/json",
279+
["X-Request-Id"] = "abc123",
280+
}
281+
})
282+
local err = response_writer.proxy_response(res)
283+
284+
assert.is_nil(err)
285+
assert.equal("application/json", headers_set["Content-Type"])
286+
assert.equal("abc123", headers_set["X-Request-Id"])
287+
end)
288+
289+
it('filters hop-by-hop headers', function()
290+
local res = make_response({
291+
headers = {
292+
["Connection"] = "keep-alive",
293+
["Keep-Alive"] = "timeout=5",
294+
["Transfer-Encoding"] = "chunked",
295+
["Proxy-Authenticate"] = "Basic",
296+
["Proxy-Authorization"] = "Basic abc",
297+
["TE"] = "trailers",
298+
["Trailers"] = "Expires",
299+
["Upgrade"] = "websocket",
300+
["Content-Length"] = "5",
301+
["X-Custom"] = "kept",
302+
}
303+
})
304+
local err = response_writer.proxy_response(res)
305+
306+
assert.is_nil(err)
307+
assert.is_nil(headers_set["Connection"])
308+
assert.is_nil(headers_set["Keep-Alive"])
309+
assert.is_nil(headers_set["Transfer-Encoding"])
310+
assert.is_nil(headers_set["Proxy-Authenticate"])
311+
assert.is_nil(headers_set["Proxy-Authorization"])
312+
assert.is_nil(headers_set["TE"])
313+
assert.is_nil(headers_set["Trailers"])
314+
assert.is_nil(headers_set["Upgrade"])
315+
assert.is_nil(headers_set["Content-Length"])
316+
assert.equal("kept", headers_set["X-Custom"])
317+
end)
318+
319+
it('filters hop-by-hop headers case-insensitively', function()
320+
local res = make_response({
321+
headers = {
322+
["CONNECTION"] = "close",
323+
["TRANSFER-ENCODING"] = "chunked",
324+
}
325+
})
326+
local err = response_writer.proxy_response(res)
327+
328+
assert.is_nil(err)
329+
assert.is_nil(headers_set["CONNECTION"])
330+
assert.is_nil(headers_set["TRANSFER-ENCODING"])
331+
end)
332+
333+
it('prints and flushes body chunks', function()
334+
local res = make_response({ chunks = { "chunk1", "chunk2" } })
335+
local err = response_writer.proxy_response(res)
242336

243-
assert.is_true(ok)
244337
assert.is_nil(err)
338+
assert.stub(ngx.print).was_called(2)
339+
assert.stub(ngx.print).was_called_with("chunk1")
340+
assert.stub(ngx.print).was_called_with("chunk2")
341+
-- assert.stub(ngx.flush).was_called(2)
342+
end)
343+
344+
it('returns true on success', function()
345+
-- local res = make_response({ chunks = { "data" } })
346+
-- local ok, err = response_writer.proxy_response(res)
347+
348+
-- assert.is_true(ok)
349+
-- assert.is_nil(err)
350+
end)
351+
352+
it('handles empty body', function()
353+
local res = make_response({ chunks = {} })
354+
local err = response_writer.proxy_response(res)
355+
356+
assert.is_nil(err)
357+
assert.stub(ngx.print).was_not_called()
358+
end)
359+
360+
it('returns error when ngx.print fails', function()
361+
ngx.print:revert()
362+
stub(ngx, 'print', function() return nil, "broken pipe" end)
363+
364+
local res = make_response({ chunks = { "data" } })
365+
local err = response_writer.proxy_response(res)
366+
367+
assert.truthy(string.find(err, "output response failed"))
368+
end)
369+
370+
it('returns error when body reader fails', function()
371+
local res = {
372+
status = 200,
373+
headers = {},
374+
body_reader = function()
375+
return nil, "read timeout"
376+
end
377+
}
378+
local err = response_writer.proxy_response(res)
379+
380+
assert.truthy(string.find(err, "failed to read response body"))
381+
end)
382+
383+
it('passes chunksize to body_reader', function()
384+
local received_chunksize
385+
local res = {
386+
status = 200,
387+
headers = {},
388+
body_reader = function(size)
389+
received_chunksize = size
390+
return nil
391+
end
392+
}
393+
response_writer.proxy_response(res, 2048)
394+
395+
assert.equal(2048, received_chunksize)
245396
end)
246397
end)
247398
end)

0 commit comments

Comments
 (0)