Skip to content

Commit 4c4bf91

Browse files
tank-bohrericmj
andauthored
feat: polymorphic helper for request body streaming (#485)
* feat: polymorphic helper for request body streaming * test: tighten request_body_window/2 coverage Remove a redundant HTTP/1 test (HTTP/1 only permits one streaming request at a time, so the "different ref while streaming" case collapses into the existing make_ref() test). Add an HTTP/2 end-to-end test that exercises the documented streaming loop across multiple iterations with a body larger than the initial window, processing WINDOW_UPDATE frames between chunks. * docs(http): improves docs around request_body_window/2 * Remove changelog entry --------- Co-authored-by: Eric Meadows-Jönsson <eric.meadows.jonsson@gmail.com>
1 parent 4fca883 commit 4c4bf91

9 files changed

Lines changed: 214 additions & 1 deletion

File tree

.dialyzer_ignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
lib/mint/tunnel_proxy.ex:49
2-
lib/mint/http1.ex:915
2+
lib/mint/http1.ex:927
33
lib/mint/unsafe_proxy.ex:173
44
lib/mint/unsafe_proxy.ex:198
55
test/support

lib/mint/core/conn.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,6 @@ defmodule Mint.Core.Conn do
6262
@callback put_proxy_headers(conn(), Mint.Types.headers()) :: conn()
6363

6464
@callback put_log(conn(), boolean()) :: conn()
65+
66+
@callback request_body_window(conn(), Types.request_ref()) :: non_neg_integer() | :infinity
6567
end

lib/mint/http.ex

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,10 @@ defmodule Mint.HTTP do
637637
638638
This function always returns an updated connection to be stored over the old connection.
639639
640+
When streaming a body of arbitrary size, use `request_body_window/2` to learn
641+
how many bytes you can send right now without violating HTTP/2 flow control,
642+
then split your body accordingly before passing each chunk to this function.
643+
640644
For information about transfer encoding and content length in HTTP/1, see
641645
`Mint.HTTP1.stream_request_body/3`.
642646
@@ -1079,6 +1083,75 @@ defmodule Mint.HTTP do
10791083
@impl true
10801084
def put_proxy_headers(conn, headers), do: conn_apply(conn, :put_proxy_headers, [conn, headers])
10811085

1086+
@doc """
1087+
Returns the request body flow-control window for the streaming request
1088+
identified by `request_ref`.
1089+
1090+
The semantics differ by protocol:
1091+
1092+
* In HTTP/2, returns `min(connection_window, stream_window)` — the maximum
1093+
number of body bytes that can be sent right now without violating flow
1094+
control. Exceeding this value in a single `DATA` frame would close the
1095+
connection with a `FLOW_CONTROL_ERROR`. See `Mint.HTTP2.get_window_size/2`
1096+
for the underlying primitives.
1097+
1098+
* In HTTP/1, returns `:infinity`. HTTP/1 has no application-level
1099+
flow-control mechanism: any amount of body data is protocol-valid.
1100+
1101+
The value returned reflects only the protocol-level flow-control
1102+
constraint. It does not account for the operating-system socket send
1103+
buffer: under either protocol, `stream_request_body/3` can still block
1104+
when that buffer fills up. To bound this behavior, configure
1105+
`send_timeout` on the socket via `:transport_opts` when establishing the
1106+
connection (see `Mint.HTTP.connect/4`).
1107+
1108+
Raises `ArgumentError` if `request_ref` is not associated with an active
1109+
streaming request.
1110+
1111+
## Examples
1112+
1113+
Streaming a binary body in chunks that respect the protocol window:
1114+
1115+
defp stream_body(conn, ref, "") do
1116+
Mint.HTTP.stream_request_body(conn, ref, :eof)
1117+
end
1118+
1119+
defp stream_body(conn, ref, body) do
1120+
conn
1121+
|> Mint.HTTP.request_body_window(ref)
1122+
|> send_body_chunk(conn, ref, body)
1123+
end
1124+
1125+
defp send_body_chunk(0, conn, ref, body) do
1126+
with {:ok, conn} <- wait(conn, ref) do
1127+
stream_body(conn, ref, body)
1128+
end
1129+
end
1130+
1131+
defp send_body_chunk(window, conn, ref, body) do
1132+
chunk_size = min(window, byte_size(body))
1133+
<<chunk::binary-size(chunk_size), rest::binary>> = body
1134+
1135+
with {:ok, conn} <- Mint.HTTP.stream_request_body(conn, ref, chunk) do
1136+
stream_body(conn, ref, rest)
1137+
end
1138+
end
1139+
1140+
defp wait(conn, ref) do
1141+
# Wait for the server to refill the request body window with a
1142+
# WINDOW_UPDATE frame. The concrete implementation depends on the
1143+
# socket mode and other context.
1144+
end
1145+
1146+
Note that `min(:infinity, n) == n` thanks to Erlang term ordering, so the
1147+
same loop works on HTTP/1 (each iteration sends the entire remaining body in
1148+
a single chunk) and on HTTP/2 (each iteration sends at most the current
1149+
flow-control window).
1150+
"""
1151+
@doc since: "1.8.0"
1152+
@impl true
1153+
def request_body_window(conn, ref), do: conn_apply(conn, :request_body_window, [conn, ref])
1154+
10821155
## Helpers
10831156

10841157
defp conn_apply(%UnsafeProxy{}, fun, args), do: apply(UnsafeProxy, fun, args)

lib/mint/http1.ex

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,18 @@ defmodule Mint.HTTP1 do
667667
%{conn | proxy_headers: headers}
668668
end
669669

670+
@doc """
671+
See `Mint.HTTP.request_body_window/2`.
672+
"""
673+
@doc since: "1.8.0"
674+
@impl true
675+
def request_body_window(%__MODULE__{streaming_request: %{ref: ref}}, ref), do: :infinity
676+
677+
def request_body_window(%__MODULE__{}, ref) do
678+
raise ArgumentError,
679+
"request with request reference #{inspect(ref)} was not found or is not streaming a body"
680+
end
681+
670682
## Helpers
671683

672684
defp decode(:status, %{request: request} = conn, data, responses) do

lib/mint/http2.ex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,6 +1253,15 @@ defmodule Mint.HTTP2 do
12531253
%{conn | proxy_headers: headers}
12541254
end
12551255

1256+
@doc """
1257+
See `Mint.HTTP.request_body_window/2`.
1258+
"""
1259+
@doc since: "1.8.0"
1260+
@impl true
1261+
def request_body_window(%__MODULE__{} = conn, ref) do
1262+
min(get_window_size(conn, :connection), get_window_size(conn, {:request, ref}))
1263+
end
1264+
12561265
## Helpers
12571266

12581267
defp handle_closed(conn) do

lib/mint/unsafe_proxy.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,4 +199,9 @@ defmodule Mint.UnsafeProxy do
199199
def put_proxy_headers(%__MODULE__{}, _headers) do
200200
raise "invalid function for proxy unsafe proxy connections"
201201
end
202+
203+
@impl true
204+
def request_body_window(%__MODULE__{module: module, state: state}, ref) do
205+
module.request_body_window(state, ref)
206+
end
202207
end

test/http_test.exs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,27 @@
11
defmodule Mint.HTTPTest do
22
use ExUnit.Case, async: true
33
doctest Mint.HTTP
4+
5+
alias Mint.{HTTP, HTTP1.TestServer}
6+
7+
setup do
8+
{:ok, port, server_ref} = TestServer.start()
9+
assert {:ok, conn} = HTTP.connect(:http, "localhost", port)
10+
assert_receive {^server_ref, server_socket}
11+
12+
[conn: conn, server_socket: server_socket]
13+
end
14+
15+
describe "request_body_window/2" do
16+
test "returns :infinity for an HTTP/1 streaming request", %{conn: conn} do
17+
{:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream)
18+
assert HTTP.request_body_window(conn, ref) == :infinity
19+
end
20+
21+
test "raises ArgumentError for an unknown request ref", %{conn: conn} do
22+
assert_raise ArgumentError, fn ->
23+
HTTP.request_body_window(conn, make_ref())
24+
end
25+
end
26+
end
427
end

test/mint/http1/conn_test.exs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,6 +1151,19 @@ defmodule Mint.HTTP1Test do
11511151
{:ok, conn, responses}
11521152
end
11531153

1154+
describe "request_body_window/2" do
1155+
test "returns :infinity for an active streaming request", %{conn: conn} do
1156+
{:ok, conn, ref} = HTTP1.request(conn, "GET", "/", [], :stream)
1157+
assert HTTP1.request_body_window(conn, ref) == :infinity
1158+
end
1159+
1160+
test "raises if no request is currently streaming a body", %{conn: conn} do
1161+
assert_raise ArgumentError, ~r/was not found or is not streaming a body/, fn ->
1162+
HTTP1.request_body_window(conn, make_ref())
1163+
end
1164+
end
1165+
end
1166+
11541167
@mint_user_agent "mint/#{Mix.Project.config()[:version]}"
11551168
defp mint_user_agent, do: @mint_user_agent
11561169
end

test/mint/http2/conn_test.exs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1890,6 +1890,82 @@ defmodule Mint.HTTP2Test do
18901890
HTTP2.get_window_size(conn, {:request, make_ref()})
18911891
end
18921892
end
1893+
1894+
test "request_body_window/2 returns the minimum of connection and request window sizes",
1895+
%{conn: conn} do
1896+
{conn, ref} = open_request(conn, :stream)
1897+
1898+
send_window = HTTP2.request_body_window(conn, ref)
1899+
conn_window = HTTP2.get_window_size(conn, :connection)
1900+
request_window = HTTP2.get_window_size(conn, {:request, ref})
1901+
1902+
assert send_window == min(conn_window, request_window)
1903+
end
1904+
1905+
test "request_body_window/2 decreases after streaming body data", %{conn: conn} do
1906+
{conn, ref} = open_request(conn, :stream)
1907+
1908+
initial_send_window = HTTP2.request_body_window(conn, ref)
1909+
assert initial_send_window > 0
1910+
1911+
body_chunk = "hello"
1912+
{:ok, conn} = HTTP2.stream_request_body(conn, ref, body_chunk)
1913+
1914+
assert HTTP2.request_body_window(conn, ref) == initial_send_window - byte_size(body_chunk)
1915+
end
1916+
1917+
test "request_body_window/2 raises if the request is not found", %{conn: conn} do
1918+
assert_raise ArgumentError, ~r/request with request reference .+ was not found/, fn ->
1919+
HTTP2.request_body_window(conn, make_ref())
1920+
end
1921+
end
1922+
1923+
@tag server_settings: [initial_window_size: 5]
1924+
test "streaming a body larger than the window using request_body_window/2 in a loop",
1925+
%{conn: conn} do
1926+
{conn, ref} = open_request(conn, :stream)
1927+
1928+
assert_recv_frames [headers(stream_id: stream_id)]
1929+
1930+
body = "0123456789ABCDE"
1931+
1932+
# First chunk: window is 5, so we send 5 bytes.
1933+
assert HTTP2.request_body_window(conn, ref) == 5
1934+
<<chunk1::binary-size(5), rest1::binary>> = body
1935+
{:ok, conn} = HTTP2.stream_request_body(conn, ref, chunk1)
1936+
1937+
assert HTTP2.request_body_window(conn, ref) == 0
1938+
1939+
assert_recv_frames [data(stream_id: ^stream_id, data: ^chunk1, flags: flags1)]
1940+
assert flags1 == set_flags(:data, [])
1941+
1942+
# Server replenishes the stream window so we can send more.
1943+
{:ok, conn, []} =
1944+
stream_frames(conn, [window_update(stream_id: stream_id, window_size_increment: 5)])
1945+
1946+
assert HTTP2.request_body_window(conn, ref) == 5
1947+
<<chunk2::binary-size(5), rest2::binary>> = rest1
1948+
{:ok, conn} = HTTP2.stream_request_body(conn, ref, chunk2)
1949+
1950+
assert_recv_frames [data(stream_id: ^stream_id, data: ^chunk2)]
1951+
1952+
# Final replenishment for the remaining bytes plus :eof.
1953+
{:ok, conn, []} =
1954+
stream_frames(conn, [
1955+
window_update(stream_id: stream_id, window_size_increment: byte_size(rest2))
1956+
])
1957+
1958+
assert HTTP2.request_body_window(conn, ref) == byte_size(rest2)
1959+
{:ok, conn} = HTTP2.stream_request_body(conn, ref, rest2)
1960+
{:ok, _conn} = HTTP2.stream_request_body(conn, ref, :eof)
1961+
1962+
assert_recv_frames [
1963+
data(stream_id: ^stream_id, data: ^rest2),
1964+
data(stream_id: ^stream_id, data: "", flags: end_flags)
1965+
]
1966+
1967+
assert end_flags == set_flags(:data, [:end_stream])
1968+
end
18931969
end
18941970

18951971
describe "default receive windows" do

0 commit comments

Comments
 (0)