From a2a1dcc43ee17abdaef4ca73853495c1416b74d2 Mon Sep 17 00:00:00 2001 From: Alexey Nikitin Date: Thu, 19 Mar 2026 12:43:01 +0400 Subject: [PATCH] feat: polymorphic helper for request body streaming --- .dialyzer_ignore | 2 +- CHANGELOG.md | 6 +++ lib/mint/core/conn.ex | 2 + lib/mint/http.ex | 99 +++++++++++++++++++++++++++++++++++ lib/mint/http1.ex | 29 +++++++++- lib/mint/http2.ex | 9 ++++ lib/mint/unsafe_proxy.ex | 5 ++ test/http_test.exs | 48 +++++++++++++++++ test/mint/http1/conn_test.exs | 47 +++++++++++++++++ test/mint/http2/conn_test.exs | 29 ++++++++++ 10 files changed, 273 insertions(+), 3 deletions(-) diff --git a/.dialyzer_ignore b/.dialyzer_ignore index a13cca82..27802436 100644 --- a/.dialyzer_ignore +++ b/.dialyzer_ignore @@ -1,5 +1,5 @@ lib/mint/tunnel_proxy.ex:49 -lib/mint/http1.ex:915 +lib/mint/http1.ex:940 lib/mint/unsafe_proxy.ex:173 lib/mint/unsafe_proxy.ex:198 test/support diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ac0c11c..06247df2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Unreleased + +### New features + + * Add `Mint.HTTP.next_body_chunk/3` and `Mint.HTTP.next_body_chunk_size/2` helpers for streaming a request body of arbitrary size while respecting the connection's current send window. In HTTP/2 the chunk size is bounded by the connection-level and per-request flow-control windows; in HTTP/1 it is bounded by the OS socket send buffer (`:sndbuf`) cached at connect time. + ## v1.7.1 ### Bug Fixes and Improvements diff --git a/lib/mint/core/conn.ex b/lib/mint/core/conn.ex index 12af3b2c..b000c8a2 100644 --- a/lib/mint/core/conn.ex +++ b/lib/mint/core/conn.ex @@ -62,4 +62,6 @@ defmodule Mint.Core.Conn do @callback put_proxy_headers(conn(), Mint.Types.headers()) :: conn() @callback put_log(conn(), boolean()) :: conn() + + @callback next_body_chunk_size(conn(), Types.request_ref()) :: non_neg_integer() end diff --git a/lib/mint/http.ex b/lib/mint/http.ex index 71c1c6ff..02712f90 100644 --- a/lib/mint/http.ex +++ b/lib/mint/http.ex @@ -623,6 +623,9 @@ defmodule Mint.HTTP do This function always returns an updated connection to be stored over the old connection. + When streaming a body of arbitrary size, use `next_body_chunk/3` to split it + into chunks that respect the current send window of the connection. + For information about transfer encoding and content length in HTTP/1, see `Mint.HTTP1.stream_request_body/3`. @@ -1065,6 +1068,102 @@ defmodule Mint.HTTP do @impl true def put_proxy_headers(conn, headers), do: conn_apply(conn, :put_proxy_headers, [conn, headers]) + @doc """ + Returns the maximum number of body bytes that can be streamed on the connection + for the given `request_ref` right now. + + The meaning of the returned value depends on the protocol of the underlying + connection: + + * In HTTP/1, it is the operating-system socket send buffer (`:sndbuf`) cached + on the connection at connect time. HTTP/1 has no flow-control mechanism, so + this is purely an I/O sizing hint — passing a larger chunk to + `stream_request_body/3` is allowed and will not violate the protocol. + + * In HTTP/2, it is the minimum of the connection-level and the per-request + flow-control windows. Sending more than this in a single `DATA` frame would + violate flow control, so the value is a hard upper bound. See + `Mint.HTTP2.get_window_size/2` for the underlying primitives. + + Raises `ArgumentError` if `request_ref` is not associated with an active + streaming request. + + See `next_body_chunk/3` for a higher-level helper that uses this value to + split a binary body into a sendable chunk. + """ + @doc since: "1.8.0" + @impl true + @spec next_body_chunk_size(t(), Types.request_ref()) :: non_neg_integer() + def next_body_chunk_size(conn, ref), do: conn_apply(conn, :next_body_chunk_size, [conn, ref]) + + @doc """ + Splits off the next chunk of `body` that can be streamed on the connection right now. + + This is a helper to be used together with `stream_request_body/3` when streaming a + request body of arbitrary size. Given a `body` binary, it inspects the connection + state via `next_body_chunk_size/2` to determine the largest chunk that can be sent + immediately, and returns that chunk together with the remainder to be streamed later. + + The return value is a `{chunk, rest}` tuple such that `chunk <> rest == body`. + When `body` is empty (or smaller than the available send window), `rest` is `""`. + + `body` must be a `t:binary/0`. If you have an `t:iodata/0` body, convert it + with `IO.iodata_to_binary/1` before calling this function. + + This function performs no I/O — it only computes how to split the binary. You + still need to pass `chunk` to `stream_request_body/3` to actually send it on + the wire. + + See `next_body_chunk_size/2` for the protocol-specific semantics of the chunk + size and a note on HTTP/1 vs HTTP/2 behavior. + + ## When not to use this helper + + This helper is convenient for small to medium request bodies that already live + as a single binary in memory. For large bodies — especially iodata assembled + from multiple parts, refc-binaries read from files, or streamed from another + source — calling `IO.iodata_to_binary/1` just to use this helper will: + + * double peak memory usage while the temporary binary is built; + * defeat the scatter-gather (`writev/2`) optimization that `:gen_tcp.send` + and `:ssl.send` perform on iolists, forcing an extra full-buffer copy. + + In those cases, drive the loop yourself: query `next_body_chunk_size/2` to + learn how many bytes you can send right now, take that prefix from your iodata + source without materializing the rest, hand it to `stream_request_body/3`, and + repeat. Mint's flow-control state is fully exposed through + `next_body_chunk_size/2` — this helper is just one ergonomic shape over it, + not the only supported one. + + ## Examples + + Streaming a body of arbitrary size by repeatedly chunking against the current + send window: + + {:ok, conn, ref} = Mint.HTTP.request(conn, "POST", "/", headers, :stream) + {:ok, conn} = stream_body(conn, ref, payload) + + defp stream_body(conn, ref, "") do + Mint.HTTP.stream_request_body(conn, ref, :eof) + end + + defp stream_body(conn, ref, body) do + {chunk, rest} = Mint.HTTP.next_body_chunk(conn, ref, body) + + with {:ok, conn} <- Mint.HTTP.stream_request_body(conn, ref, chunk) do + stream_body(conn, ref, rest) + end + end + + """ + @doc since: "1.8.0" + @spec next_body_chunk(t(), Types.request_ref(), binary()) :: {binary(), binary()} + def next_body_chunk(conn, ref, body) when is_binary(body) do + chunk_size = min(next_body_chunk_size(conn, ref), byte_size(body)) + <> = body + {chunk, rest} + end + ## Helpers defp conn_apply(%UnsafeProxy{}, fun, args), do: apply(UnsafeProxy, fun, args) diff --git a/lib/mint/http1.ex b/lib/mint/http1.ex index 941e12d0..5b7c4915 100644 --- a/lib/mint/http1.ex +++ b/lib/mint/http1.ex @@ -102,7 +102,13 @@ defmodule Mint.HTTP1 do proxy_headers: [], private: %{}, log: false, - optional_responses: [] + optional_responses: [], + # Chunk size returned by `next_body_chunk_size/2`. Seeded from the OS socket + # send buffer (`:sndbuf`) at connect time, falling back to 16 KiB. Cached on + # the connection so the streaming hot path doesn't pay a `getopts` syscall + # per chunk. HTTP/1 has no flow-control window, so this is purely a sizing + # hint and does not need to track the live socket value. + body_chunk_size: 16_384 ] defmacrop log(conn, level, message) do @@ -204,6 +210,7 @@ defmodule Mint.HTTP1 do end with :ok <- Util.inet_opts(transport, socket), + {:ok, sndbuf_opts} <- transport.getopts(socket, [:sndbuf]), :ok <- if(mode == :active, do: transport.setopts(socket, active: :once), else: :ok) do conn = %__MODULE__{ transport: transport, @@ -216,7 +223,8 @@ defmodule Mint.HTTP1 do log: log?, case_sensitive_headers: Keyword.get(opts, :case_sensitive_headers, false), skip_target_validation: Keyword.get(opts, :skip_target_validation, false), - optional_responses: validate_optional_response_values(opts) + optional_responses: validate_optional_response_values(opts), + body_chunk_size: sndbuf_opts[:sndbuf] || 16_384 } {:ok, conn} @@ -667,6 +675,23 @@ defmodule Mint.HTTP1 do %{conn | proxy_headers: headers} end + @doc """ + See `Mint.HTTP.next_body_chunk_size/2`. + """ + @doc since: "1.8.0" + @impl true + @spec next_body_chunk_size(t(), Types.request_ref()) :: non_neg_integer() + def next_body_chunk_size( + %__MODULE__{streaming_request: %{ref: ref}, body_chunk_size: body_chunk_size}, + ref + ), + do: body_chunk_size + + def next_body_chunk_size(%__MODULE__{}, ref) do + raise ArgumentError, + "request with request reference #{inspect(ref)} was not found or is not streaming a body" + end + ## Helpers defp decode(:status, %{request: request} = conn, data, responses) do diff --git a/lib/mint/http2.ex b/lib/mint/http2.ex index d1b562d0..fd84b376 100644 --- a/lib/mint/http2.ex +++ b/lib/mint/http2.ex @@ -1039,6 +1039,15 @@ defmodule Mint.HTTP2 do %{conn | proxy_headers: headers} end + @doc """ + See `Mint.HTTP.next_body_chunk_size/2`. + """ + @doc since: "1.8.0" + @impl true + @spec next_body_chunk_size(t(), Types.request_ref()) :: non_neg_integer() + def next_body_chunk_size(%__MODULE__{} = conn, ref), + do: min(get_window_size(conn, :connection), get_window_size(conn, {:request, ref})) + ## Helpers defp handle_closed(conn) do diff --git a/lib/mint/unsafe_proxy.ex b/lib/mint/unsafe_proxy.ex index b6318ffa..7a4358f4 100644 --- a/lib/mint/unsafe_proxy.ex +++ b/lib/mint/unsafe_proxy.ex @@ -199,4 +199,9 @@ defmodule Mint.UnsafeProxy do def put_proxy_headers(%__MODULE__{}, _headers) do raise "invalid function for proxy unsafe proxy connections" end + + @impl true + def next_body_chunk_size(%__MODULE__{module: module, state: state}, ref) do + module.next_body_chunk_size(state, ref) + end end diff --git a/test/http_test.exs b/test/http_test.exs index 70e307c2..3c6b099f 100644 --- a/test/http_test.exs +++ b/test/http_test.exs @@ -1,4 +1,52 @@ defmodule Mint.HTTPTest do use ExUnit.Case, async: true doctest Mint.HTTP + + alias Mint.{HTTP, HTTP1.TestServer} + + setup do + {:ok, port, server_ref} = TestServer.start() + assert {:ok, conn} = HTTP.connect(:http, "localhost", port) + assert_receive {^server_ref, server_socket} + + [conn: conn, server_socket: server_socket] + end + + describe "next_body_chunk_size/2" do + test "returns a positive integer for an active streaming request", %{conn: conn} do + {:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream) + + send_window = HTTP.next_body_chunk_size(conn, ref) + assert is_integer(send_window) + assert send_window > 0 + end + + test "raises ArgumentError for an unknown request ref", %{conn: conn} do + assert_raise ArgumentError, fn -> + HTTP.next_body_chunk_size(conn, make_ref()) + end + end + end + + describe "next_body_chunk/3" do + test "returns body size when body is smaller than send window", %{conn: conn} do + {:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream) + small_body = "hello" + assert {"hello", ""} = HTTP.next_body_chunk(conn, ref, small_body) + end + + test "returns send window when body is larger than send window", %{conn: conn} do + {:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream) + chunk_size = HTTP.next_body_chunk_size(conn, ref) + large_body = :binary.copy(<<0>>, chunk_size + 1000) + assert {chunk, rest} = HTTP.next_body_chunk(conn, ref, large_body) + assert byte_size(chunk) == chunk_size + assert byte_size(rest) == 1000 + end + + test "returns 0 for empty body", %{conn: conn} do + {:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream) + assert {"", ""} = HTTP.next_body_chunk(conn, ref, "") + end + end end diff --git a/test/mint/http1/conn_test.exs b/test/mint/http1/conn_test.exs index e354be14..48247706 100644 --- a/test/mint/http1/conn_test.exs +++ b/test/mint/http1/conn_test.exs @@ -1151,6 +1151,53 @@ defmodule Mint.HTTP1Test do {:ok, conn, responses} end + describe "next_body_chunk_size/2" do + test "returns a positive integer for an active streaming request", %{conn: conn} do + {:ok, conn, ref} = HTTP1.request(conn, "GET", "/", [], :stream) + + send_window = HTTP1.next_body_chunk_size(conn, ref) + assert is_integer(send_window) + assert send_window > 0 + end + + test "returns the cached body_chunk_size value", %{conn: conn} do + {:ok, conn, ref} = HTTP1.request(conn, "GET", "/", [], :stream) + assert HTTP1.next_body_chunk_size(conn, ref) == conn.body_chunk_size + end + + test "raises if no request is currently streaming a body", %{conn: conn} do + assert_raise ArgumentError, ~r/was not found or is not streaming a body/, fn -> + HTTP1.next_body_chunk_size(conn, make_ref()) + end + end + + test "raises if the ref does not match the active streaming request", %{conn: conn} do + {:ok, conn, _ref} = HTTP1.request(conn, "GET", "/", [], :stream) + + assert_raise ArgumentError, ~r/was not found or is not streaming a body/, fn -> + HTTP1.next_body_chunk_size(conn, make_ref()) + end + end + end + + describe "next_body_chunk/3" do + test "iteratively drains a binary body across multiple calls", %{conn: conn} do + {:ok, conn, ref} = HTTP1.request(conn, "GET", "/", [], :stream) + body = :binary.copy(<>, conn.body_chunk_size * 3 + 17) + + chunks = drain_body(conn, ref, body, []) + assert IO.iodata_to_binary(chunks) == body + assert length(chunks) == 4 + end + end + + defp drain_body(_conn, _ref, "", chunks), do: Enum.reverse(chunks) + + defp drain_body(conn, ref, body, chunks) do + {chunk, rest} = Mint.HTTP.next_body_chunk(conn, ref, body) + drain_body(conn, ref, rest, [chunk | chunks]) + end + @mint_user_agent "mint/#{Mix.Project.config()[:version]}" defp mint_user_agent, do: @mint_user_agent end diff --git a/test/mint/http2/conn_test.exs b/test/mint/http2/conn_test.exs index 27215664..bccc3349 100644 --- a/test/mint/http2/conn_test.exs +++ b/test/mint/http2/conn_test.exs @@ -1772,6 +1772,35 @@ defmodule Mint.HTTP2Test do HTTP2.get_window_size(conn, {:request, make_ref()}) end end + + test "next_body_chunk_size/2 returns the minimum of connection and request window sizes", + %{conn: conn} do + {conn, ref} = open_request(conn, :stream) + + send_window = HTTP2.next_body_chunk_size(conn, ref) + conn_window = HTTP2.get_window_size(conn, :connection) + request_window = HTTP2.get_window_size(conn, {:request, ref}) + + assert send_window == min(conn_window, request_window) + end + + test "next_body_chunk_size/2 decreases after streaming body data", %{conn: conn} do + {conn, ref} = open_request(conn, :stream) + + initial_send_window = HTTP2.next_body_chunk_size(conn, ref) + assert initial_send_window > 0 + + body_chunk = "hello" + {:ok, conn} = HTTP2.stream_request_body(conn, ref, body_chunk) + + assert HTTP2.next_body_chunk_size(conn, ref) == initial_send_window - byte_size(body_chunk) + end + + test "next_body_chunk_size/2 raises if the request is not found", %{conn: conn} do + assert_raise ArgumentError, ~r/request with request reference .+ was not found/, fn -> + HTTP2.next_body_chunk_size(conn, make_ref()) + end + end end describe "settings" do