Skip to content

Commit 3b56627

Browse files
committed
feat: polymorphic helper for request body streaming
1 parent d3fee6e commit 3b56627

9 files changed

Lines changed: 272 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
### New features
6+
7+
* Add `Mint.HTTP.next_body_chunk/3` and `Mint.HTTP.next_body_chunk_size/2` helpers for streaming a request body of arbitrary size while respecting the connection's current send window. In HTTP/2 the chunk size is bounded by the connection-level and per-request flow-control windows; in HTTP/1 it is bounded by the OS socket send buffer (`:sndbuf`) cached at connect time.
8+
39
## v1.7.1
410

511
### Bug Fixes and Improvements

lib/mint/core/conn.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,6 @@ defmodule Mint.Core.Conn do
6262
@callback put_proxy_headers(conn(), Mint.Types.headers()) :: conn()
6363

6464
@callback put_log(conn(), boolean()) :: conn()
65+
66+
@callback next_body_chunk_size(conn(), Types.request_ref()) :: non_neg_integer()
6567
end

lib/mint/http.ex

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,9 @@ defmodule Mint.HTTP do
623623
624624
This function always returns an updated connection to be stored over the old connection.
625625
626+
When streaming a body of arbitrary size, use `next_body_chunk/3` to split it
627+
into chunks that respect the current send window of the connection.
628+
626629
For information about transfer encoding and content length in HTTP/1, see
627630
`Mint.HTTP1.stream_request_body/3`.
628631
@@ -1065,6 +1068,102 @@ defmodule Mint.HTTP do
10651068
@impl true
10661069
def put_proxy_headers(conn, headers), do: conn_apply(conn, :put_proxy_headers, [conn, headers])
10671070

1071+
@doc """
1072+
Returns the maximum number of body bytes that can be streamed on the connection
1073+
for the given `request_ref` right now.
1074+
1075+
The meaning of the returned value depends on the protocol of the underlying
1076+
connection:
1077+
1078+
* In HTTP/1, it is the operating-system socket send buffer (`:sndbuf`) cached
1079+
on the connection at connect time. HTTP/1 has no flow-control mechanism, so
1080+
this is purely an I/O sizing hint — passing a larger chunk to
1081+
`stream_request_body/3` is allowed and will not violate the protocol.
1082+
1083+
* In HTTP/2, it is the minimum of the connection-level and the per-request
1084+
flow-control windows. Sending more than this in a single `DATA` frame would
1085+
violate flow control, so the value is a hard upper bound. See
1086+
`Mint.HTTP2.get_window_size/2` for the underlying primitives.
1087+
1088+
Raises `ArgumentError` if `request_ref` is not associated with an active
1089+
streaming request.
1090+
1091+
See `next_body_chunk/3` for a higher-level helper that uses this value to
1092+
split a binary body into a sendable chunk.
1093+
"""
1094+
@doc since: "1.8.0"
1095+
@impl true
1096+
@spec next_body_chunk_size(t(), Types.request_ref()) :: non_neg_integer()
1097+
def next_body_chunk_size(conn, ref), do: conn_apply(conn, :next_body_chunk_size, [conn, ref])
1098+
1099+
@doc """
1100+
Splits off the next chunk of `body` that can be streamed on the connection right now.
1101+
1102+
This is a helper to be used together with `stream_request_body/3` when streaming a
1103+
request body of arbitrary size. Given a `body` binary, it inspects the connection
1104+
state via `next_body_chunk_size/2` to determine the largest chunk that can be sent
1105+
immediately, and returns that chunk together with the remainder to be streamed later.
1106+
1107+
The return value is a `{chunk, rest}` tuple such that `chunk <> rest == body`.
1108+
When `body` is empty (or smaller than the available send window), `rest` is `""`.
1109+
1110+
`body` must be a `t:binary/0`. If you have an `t:iodata/0` body, convert it
1111+
with `IO.iodata_to_binary/1` before calling this function.
1112+
1113+
This function performs no I/O — it only computes how to split the binary. You
1114+
still need to pass `chunk` to `stream_request_body/3` to actually send it on
1115+
the wire.
1116+
1117+
See `next_body_chunk_size/2` for the protocol-specific semantics of the chunk
1118+
size and a note on HTTP/1 vs HTTP/2 behavior.
1119+
1120+
## When not to use this helper
1121+
1122+
This helper is convenient for small to medium request bodies that already live
1123+
as a single binary in memory. For large bodies — especially iodata assembled
1124+
from multiple parts, refc-binaries read from files, or streamed from another
1125+
source — calling `IO.iodata_to_binary/1` just to use this helper will:
1126+
1127+
* double peak memory usage while the temporary binary is built;
1128+
* defeat the scatter-gather (`writev/2`) optimization that `:gen_tcp.send`
1129+
and `:ssl.send` perform on iolists, forcing an extra full-buffer copy.
1130+
1131+
In those cases, drive the loop yourself: query `next_body_chunk_size/2` to
1132+
learn how many bytes you can send right now, take that prefix from your iodata
1133+
source without materializing the rest, hand it to `stream_request_body/3`, and
1134+
repeat. Mint's flow-control state is fully exposed through
1135+
`next_body_chunk_size/2` — this helper is just one ergonomic shape over it,
1136+
not the only supported one.
1137+
1138+
## Examples
1139+
1140+
Streaming a body of arbitrary size by repeatedly chunking against the current
1141+
send window:
1142+
1143+
{:ok, conn, ref} = Mint.HTTP.request(conn, "POST", "/", headers, :stream)
1144+
{:ok, conn} = stream_body(conn, ref, payload)
1145+
1146+
defp stream_body(conn, ref, "") do
1147+
Mint.HTTP.stream_request_body(conn, ref, :eof)
1148+
end
1149+
1150+
defp stream_body(conn, ref, body) do
1151+
{chunk, rest} = Mint.HTTP.next_body_chunk(conn, ref, body)
1152+
1153+
with {:ok, conn} <- Mint.HTTP.stream_request_body(conn, ref, chunk) do
1154+
stream_body(conn, ref, rest)
1155+
end
1156+
end
1157+
1158+
"""
1159+
@doc since: "1.8.0"
1160+
@spec next_body_chunk(t(), Types.request_ref(), binary()) :: {binary(), binary()}
1161+
def next_body_chunk(conn, ref, body) when is_binary(body) do
1162+
chunk_size = min(next_body_chunk_size(conn, ref), byte_size(body))
1163+
<<chunk::binary-size(chunk_size), rest::binary>> = body
1164+
{chunk, rest}
1165+
end
1166+
10681167
## Helpers
10691168

10701169
defp conn_apply(%UnsafeProxy{}, fun, args), do: apply(UnsafeProxy, fun, args)

lib/mint/http1.ex

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,13 @@ defmodule Mint.HTTP1 do
102102
proxy_headers: [],
103103
private: %{},
104104
log: false,
105-
optional_responses: []
105+
optional_responses: [],
106+
# Chunk size returned by `next_body_chunk_size/2`. Seeded from the OS socket
107+
# send buffer (`:sndbuf`) at connect time, falling back to 16 KiB. Cached on
108+
# the connection so the streaming hot path doesn't pay a `getopts` syscall
109+
# per chunk. HTTP/1 has no flow-control window, so this is purely a sizing
110+
# hint and does not need to track the live socket value.
111+
body_chunk_size: 16_384
106112
]
107113

108114
defmacrop log(conn, level, message) do
@@ -204,6 +210,7 @@ defmodule Mint.HTTP1 do
204210
end
205211

206212
with :ok <- Util.inet_opts(transport, socket),
213+
{:ok, sndbuf_opts} <- transport.getopts(socket, [:sndbuf]),
207214
:ok <- if(mode == :active, do: transport.setopts(socket, active: :once), else: :ok) do
208215
conn = %__MODULE__{
209216
transport: transport,
@@ -216,7 +223,8 @@ defmodule Mint.HTTP1 do
216223
log: log?,
217224
case_sensitive_headers: Keyword.get(opts, :case_sensitive_headers, false),
218225
skip_target_validation: Keyword.get(opts, :skip_target_validation, false),
219-
optional_responses: validate_optional_response_values(opts)
226+
optional_responses: validate_optional_response_values(opts),
227+
body_chunk_size: sndbuf_opts[:sndbuf] || 16_384
220228
}
221229

222230
{:ok, conn}
@@ -667,6 +675,23 @@ defmodule Mint.HTTP1 do
667675
%{conn | proxy_headers: headers}
668676
end
669677

678+
@doc """
679+
See `Mint.HTTP.next_body_chunk_size/2`.
680+
"""
681+
@doc since: "1.8.0"
682+
@impl true
683+
@spec next_body_chunk_size(t(), Types.request_ref()) :: non_neg_integer()
684+
def next_body_chunk_size(
685+
%__MODULE__{streaming_request: %{ref: ref}, body_chunk_size: body_chunk_size},
686+
ref
687+
),
688+
do: body_chunk_size
689+
690+
def next_body_chunk_size(%__MODULE__{}, ref) do
691+
raise ArgumentError,
692+
"request with request reference #{inspect(ref)} was not found or is not streaming a body"
693+
end
694+
670695
## Helpers
671696

672697
defp decode(:status, %{request: request} = conn, data, responses) do

lib/mint/http2.ex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,6 +1039,15 @@ defmodule Mint.HTTP2 do
10391039
%{conn | proxy_headers: headers}
10401040
end
10411041

1042+
@doc """
1043+
See `Mint.HTTP.next_body_chunk_size/2`.
1044+
"""
1045+
@doc since: "1.8.0"
1046+
@impl true
1047+
@spec next_body_chunk_size(t(), Types.request_ref()) :: non_neg_integer()
1048+
def next_body_chunk_size(%__MODULE__{} = conn, ref),
1049+
do: min(get_window_size(conn, :connection), get_window_size(conn, {:request, ref}))
1050+
10421051
## Helpers
10431052

10441053
defp handle_closed(conn) do

lib/mint/unsafe_proxy.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,4 +199,9 @@ defmodule Mint.UnsafeProxy do
199199
def put_proxy_headers(%__MODULE__{}, _headers) do
200200
raise "invalid function for proxy unsafe proxy connections"
201201
end
202+
203+
@impl true
204+
def next_body_chunk_size(%__MODULE__{module: module, state: state}, ref) do
205+
module.next_body_chunk_size(state, ref)
206+
end
202207
end

test/http_test.exs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,52 @@
11
defmodule Mint.HTTPTest do
22
use ExUnit.Case, async: true
33
doctest Mint.HTTP
4+
5+
alias Mint.{HTTP, HTTP1.TestServer}
6+
7+
setup do
8+
{:ok, port, server_ref} = TestServer.start()
9+
assert {:ok, conn} = HTTP.connect(:http, "localhost", port)
10+
assert_receive {^server_ref, server_socket}
11+
12+
[conn: conn, server_socket: server_socket]
13+
end
14+
15+
describe "next_body_chunk_size/2" do
16+
test "returns a positive integer for an active streaming request", %{conn: conn} do
17+
{:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream)
18+
19+
send_window = HTTP.next_body_chunk_size(conn, ref)
20+
assert is_integer(send_window)
21+
assert send_window > 0
22+
end
23+
24+
test "raises ArgumentError for an unknown request ref", %{conn: conn} do
25+
assert_raise ArgumentError, fn ->
26+
HTTP.next_body_chunk_size(conn, make_ref())
27+
end
28+
end
29+
end
30+
31+
describe "next_body_chunk/3" do
32+
test "returns body size when body is smaller than send window", %{conn: conn} do
33+
{:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream)
34+
small_body = "hello"
35+
assert {"hello", ""} = HTTP.next_body_chunk(conn, ref, small_body)
36+
end
37+
38+
test "returns send window when body is larger than send window", %{conn: conn} do
39+
{:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream)
40+
chunk_size = HTTP.next_body_chunk_size(conn, ref)
41+
large_body = :binary.copy(<<0>>, chunk_size + 1000)
42+
assert {chunk, rest} = HTTP.next_body_chunk(conn, ref, large_body)
43+
assert byte_size(chunk) == chunk_size
44+
assert byte_size(rest) == 1000
45+
end
46+
47+
test "returns 0 for empty body", %{conn: conn} do
48+
{:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream)
49+
assert {"", ""} = HTTP.next_body_chunk(conn, ref, "")
50+
end
51+
end
452
end

test/mint/http1/conn_test.exs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,6 +1151,53 @@ defmodule Mint.HTTP1Test do
11511151
{:ok, conn, responses}
11521152
end
11531153

1154+
describe "next_body_chunk_size/2" do
1155+
test "returns a positive integer for an active streaming request", %{conn: conn} do
1156+
{:ok, conn, ref} = HTTP1.request(conn, "GET", "/", [], :stream)
1157+
1158+
send_window = HTTP1.next_body_chunk_size(conn, ref)
1159+
assert is_integer(send_window)
1160+
assert send_window > 0
1161+
end
1162+
1163+
test "returns the cached body_chunk_size value", %{conn: conn} do
1164+
{:ok, conn, ref} = HTTP1.request(conn, "GET", "/", [], :stream)
1165+
assert HTTP1.next_body_chunk_size(conn, ref) == conn.body_chunk_size
1166+
end
1167+
1168+
test "raises if no request is currently streaming a body", %{conn: conn} do
1169+
assert_raise ArgumentError, ~r/was not found or is not streaming a body/, fn ->
1170+
HTTP1.next_body_chunk_size(conn, make_ref())
1171+
end
1172+
end
1173+
1174+
test "raises if the ref does not match the active streaming request", %{conn: conn} do
1175+
{:ok, conn, _ref} = HTTP1.request(conn, "GET", "/", [], :stream)
1176+
1177+
assert_raise ArgumentError, ~r/was not found or is not streaming a body/, fn ->
1178+
HTTP1.next_body_chunk_size(conn, make_ref())
1179+
end
1180+
end
1181+
end
1182+
1183+
describe "next_body_chunk/3" do
1184+
test "iteratively drains a binary body across multiple calls", %{conn: conn} do
1185+
{:ok, conn, ref} = HTTP1.request(conn, "GET", "/", [], :stream)
1186+
body = :binary.copy(<<?a>>, conn.body_chunk_size * 3 + 17)
1187+
1188+
chunks = drain_body(conn, ref, body, [])
1189+
assert IO.iodata_to_binary(chunks) == body
1190+
assert length(chunks) == 4
1191+
end
1192+
end
1193+
1194+
defp drain_body(_conn, _ref, "", chunks), do: Enum.reverse(chunks)
1195+
1196+
defp drain_body(conn, ref, body, chunks) do
1197+
{chunk, rest} = Mint.HTTP.next_body_chunk(conn, ref, body)
1198+
drain_body(conn, ref, rest, [chunk | chunks])
1199+
end
1200+
11541201
@mint_user_agent "mint/#{Mix.Project.config()[:version]}"
11551202
defp mint_user_agent, do: @mint_user_agent
11561203
end

test/mint/http2/conn_test.exs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1772,6 +1772,35 @@ defmodule Mint.HTTP2Test do
17721772
HTTP2.get_window_size(conn, {:request, make_ref()})
17731773
end
17741774
end
1775+
1776+
test "next_body_chunk_size/2 returns the minimum of connection and request window sizes",
1777+
%{conn: conn} do
1778+
{conn, ref} = open_request(conn, :stream)
1779+
1780+
send_window = HTTP2.next_body_chunk_size(conn, ref)
1781+
conn_window = HTTP2.get_window_size(conn, :connection)
1782+
request_window = HTTP2.get_window_size(conn, {:request, ref})
1783+
1784+
assert send_window == min(conn_window, request_window)
1785+
end
1786+
1787+
test "next_body_chunk_size/2 decreases after streaming body data", %{conn: conn} do
1788+
{conn, ref} = open_request(conn, :stream)
1789+
1790+
initial_send_window = HTTP2.next_body_chunk_size(conn, ref)
1791+
assert initial_send_window > 0
1792+
1793+
body_chunk = "hello"
1794+
{:ok, conn} = HTTP2.stream_request_body(conn, ref, body_chunk)
1795+
1796+
assert HTTP2.next_body_chunk_size(conn, ref) == initial_send_window - byte_size(body_chunk)
1797+
end
1798+
1799+
test "next_body_chunk_size/2 raises if the request is not found", %{conn: conn} do
1800+
assert_raise ArgumentError, ~r/request with request reference .+ was not found/, fn ->
1801+
HTTP2.next_body_chunk_size(conn, make_ref())
1802+
end
1803+
end
17751804
end
17761805

17771806
describe "settings" do

0 commit comments

Comments
 (0)