Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .dialyzer_ignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
lib/mint/tunnel_proxy.ex:49
lib/mint/http1.ex:915
lib/mint/http1.ex:940
lib/mint/unsafe_proxy.ex:173
lib/mint/unsafe_proxy.ex:198
test/support
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

### New features

* Add `Mint.HTTP.next_body_chunk/3` and `Mint.HTTP.next_body_chunk_size/2` helpers for streaming a request body of arbitrary size while respecting the connection's current send window. In HTTP/2 the chunk size is bounded by the connection-level and per-request flow-control windows; in HTTP/1 it is bounded by the OS socket send buffer (`:sndbuf`) cached at connect time.

## v1.7.1

### Bug Fixes and Improvements
Expand Down
2 changes: 2 additions & 0 deletions lib/mint/core/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ defmodule Mint.Core.Conn do
@callback put_proxy_headers(conn(), Mint.Types.headers()) :: conn()

@callback put_log(conn(), boolean()) :: conn()

@callback next_body_chunk_size(conn(), Types.request_ref()) :: non_neg_integer()
end
99 changes: 99 additions & 0 deletions lib/mint/http.ex
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,9 @@ defmodule Mint.HTTP do

This function always returns an updated connection to be stored over the old connection.

When streaming a body of arbitrary size, use `next_body_chunk/3` to split it
into chunks that respect the current send window of the connection.

For information about transfer encoding and content length in HTTP/1, see
`Mint.HTTP1.stream_request_body/3`.

Expand Down Expand Up @@ -1065,6 +1068,102 @@ defmodule Mint.HTTP do
@impl true
def put_proxy_headers(conn, headers), do: conn_apply(conn, :put_proxy_headers, [conn, headers])

@doc """
Returns the maximum number of body bytes that can be streamed on the connection
for the given `request_ref` right now.

The meaning of the returned value depends on the protocol of the underlying
connection:

* In HTTP/1, it is the operating-system socket send buffer (`:sndbuf`) cached
on the connection at connect time. HTTP/1 has no flow-control mechanism, so
this is purely an I/O sizing hint — passing a larger chunk to
`stream_request_body/3` is allowed and will not violate the protocol.

* In HTTP/2, it is the minimum of the connection-level and the per-request
flow-control windows. Sending more than this in a single `DATA` frame would
violate flow control, so the value is a hard upper bound. See
`Mint.HTTP2.get_window_size/2` for the underlying primitives.

Raises `ArgumentError` if `request_ref` is not associated with an active
streaming request.

See `next_body_chunk/3` for a higher-level helper that uses this value to
split a binary body into a sendable chunk.
"""
@doc since: "1.8.0"
@impl true
@spec next_body_chunk_size(t(), Types.request_ref()) :: non_neg_integer()
def next_body_chunk_size(conn, ref), do: conn_apply(conn, :next_body_chunk_size, [conn, ref])

@doc """
Splits off the next chunk of `body` that can be streamed on the connection right now.

This is a helper to be used together with `stream_request_body/3` when streaming a
request body of arbitrary size. Given a `body` binary, it inspects the connection
state via `next_body_chunk_size/2` to determine the largest chunk that can be sent
immediately, and returns that chunk together with the remainder to be streamed later.

The return value is a `{chunk, rest}` tuple such that `chunk <> rest == body`.
When `body` is empty (or smaller than the available send window), `rest` is `""`.

`body` must be a `t:binary/0`. If you have an `t:iodata/0` body, convert it
with `IO.iodata_to_binary/1` before calling this function.

This function performs no I/O — it only computes how to split the binary. You
still need to pass `chunk` to `stream_request_body/3` to actually send it on
the wire.

See `next_body_chunk_size/2` for the protocol-specific semantics of the chunk
size and a note on HTTP/1 vs HTTP/2 behavior.

## When not to use this helper

This helper is convenient for small to medium request bodies that already live
as a single binary in memory. For large bodies — especially iodata assembled
from multiple parts, refc-binaries read from files, or streamed from another
source — calling `IO.iodata_to_binary/1` just to use this helper will:

* double peak memory usage while the temporary binary is built;
* defeat the scatter-gather (`writev/2`) optimization that `:gen_tcp.send`
and `:ssl.send` perform on iolists, forcing an extra full-buffer copy.

In those cases, drive the loop yourself: query `next_body_chunk_size/2` to
learn how many bytes you can send right now, take that prefix from your iodata
source without materializing the rest, hand it to `stream_request_body/3`, and
repeat. Mint's flow-control state is fully exposed through
`next_body_chunk_size/2` — this helper is just one ergonomic shape over it,
not the only supported one.

## Examples

Streaming a body of arbitrary size by repeatedly chunking against the current
send window:

{:ok, conn, ref} = Mint.HTTP.request(conn, "POST", "/", headers, :stream)
{:ok, conn} = stream_body(conn, ref, payload)

defp stream_body(conn, ref, "") do
Mint.HTTP.stream_request_body(conn, ref, :eof)
end

defp stream_body(conn, ref, body) do
{chunk, rest} = Mint.HTTP.next_body_chunk(conn, ref, body)

with {:ok, conn} <- Mint.HTTP.stream_request_body(conn, ref, chunk) do
stream_body(conn, ref, rest)
end
end

"""
@doc since: "1.8.0"
@spec next_body_chunk(t(), Types.request_ref(), binary()) :: {binary(), binary()}
def next_body_chunk(conn, ref, body) when is_binary(body) do
chunk_size = min(next_body_chunk_size(conn, ref), byte_size(body))
<<chunk::binary-size(chunk_size), rest::binary>> = body
{chunk, rest}
end

## Helpers

defp conn_apply(%UnsafeProxy{}, fun, args), do: apply(UnsafeProxy, fun, args)
Expand Down
29 changes: 27 additions & 2 deletions lib/mint/http1.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,13 @@ defmodule Mint.HTTP1 do
proxy_headers: [],
private: %{},
log: false,
optional_responses: []
optional_responses: [],
# Chunk size returned by `next_body_chunk_size/2`. Seeded from the OS socket
# send buffer (`:sndbuf`) at connect time, falling back to 16 KiB. Cached on
# the connection so the streaming hot path doesn't pay a `getopts` syscall
# per chunk. HTTP/1 has no flow-control window, so this is purely a sizing
# hint and does not need to track the live socket value.
body_chunk_size: 16_384
]

defmacrop log(conn, level, message) do
Expand Down Expand Up @@ -204,6 +210,7 @@ defmodule Mint.HTTP1 do
end

with :ok <- Util.inet_opts(transport, socket),
{:ok, sndbuf_opts} <- transport.getopts(socket, [:sndbuf]),
:ok <- if(mode == :active, do: transport.setopts(socket, active: :once), else: :ok) do
conn = %__MODULE__{
transport: transport,
Expand All @@ -216,7 +223,8 @@ defmodule Mint.HTTP1 do
log: log?,
case_sensitive_headers: Keyword.get(opts, :case_sensitive_headers, false),
skip_target_validation: Keyword.get(opts, :skip_target_validation, false),
optional_responses: validate_optional_response_values(opts)
optional_responses: validate_optional_response_values(opts),
body_chunk_size: sndbuf_opts[:sndbuf] || 16_384
}

{:ok, conn}
Expand Down Expand Up @@ -667,6 +675,23 @@ defmodule Mint.HTTP1 do
%{conn | proxy_headers: headers}
end

@doc """
See `Mint.HTTP.next_body_chunk_size/2`.
"""
@doc since: "1.8.0"
@impl true
@spec next_body_chunk_size(t(), Types.request_ref()) :: non_neg_integer()
def next_body_chunk_size(
%__MODULE__{streaming_request: %{ref: ref}, body_chunk_size: body_chunk_size},
ref
),
do: body_chunk_size

def next_body_chunk_size(%__MODULE__{}, ref) do
raise ArgumentError,
"request with request reference #{inspect(ref)} was not found or is not streaming a body"
end

## Helpers

defp decode(:status, %{request: request} = conn, data, responses) do
Expand Down
9 changes: 9 additions & 0 deletions lib/mint/http2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,15 @@ defmodule Mint.HTTP2 do
%{conn | proxy_headers: headers}
end

@doc """
See `Mint.HTTP.next_body_chunk_size/2`.
"""
@doc since: "1.8.0"
@impl true
@spec next_body_chunk_size(t(), Types.request_ref()) :: non_neg_integer()
def next_body_chunk_size(%__MODULE__{} = conn, ref),
do: min(get_window_size(conn, :connection), get_window_size(conn, {:request, ref}))

## Helpers

defp handle_closed(conn) do
Expand Down
5 changes: 5 additions & 0 deletions lib/mint/unsafe_proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,9 @@ defmodule Mint.UnsafeProxy do
def put_proxy_headers(%__MODULE__{}, _headers) do
raise "invalid function for proxy unsafe proxy connections"
end

@impl true
def next_body_chunk_size(%__MODULE__{module: module, state: state}, ref) do
module.next_body_chunk_size(state, ref)
end
end
48 changes: 48 additions & 0 deletions test/http_test.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,52 @@
defmodule Mint.HTTPTest do
use ExUnit.Case, async: true
doctest Mint.HTTP

alias Mint.{HTTP, HTTP1.TestServer}

setup do
{:ok, port, server_ref} = TestServer.start()
assert {:ok, conn} = HTTP.connect(:http, "localhost", port)
assert_receive {^server_ref, server_socket}

[conn: conn, server_socket: server_socket]
end

describe "next_body_chunk_size/2" do
test "returns a positive integer for an active streaming request", %{conn: conn} do
{:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream)

send_window = HTTP.next_body_chunk_size(conn, ref)
assert is_integer(send_window)
assert send_window > 0
end

test "raises ArgumentError for an unknown request ref", %{conn: conn} do
assert_raise ArgumentError, fn ->
HTTP.next_body_chunk_size(conn, make_ref())
end
end
end

describe "next_body_chunk/3" do
test "returns body size when body is smaller than send window", %{conn: conn} do
{:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream)
small_body = "hello"
assert {"hello", ""} = HTTP.next_body_chunk(conn, ref, small_body)
end

test "returns send window when body is larger than send window", %{conn: conn} do
{:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream)
chunk_size = HTTP.next_body_chunk_size(conn, ref)
large_body = :binary.copy(<<0>>, chunk_size + 1000)
assert {chunk, rest} = HTTP.next_body_chunk(conn, ref, large_body)
assert byte_size(chunk) == chunk_size
assert byte_size(rest) == 1000
end

test "returns 0 for empty body", %{conn: conn} do
{:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream)
assert {"", ""} = HTTP.next_body_chunk(conn, ref, "")
end
end
end
47 changes: 47 additions & 0 deletions test/mint/http1/conn_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,53 @@ defmodule Mint.HTTP1Test do
{:ok, conn, responses}
end

describe "next_body_chunk_size/2" do
test "returns a positive integer for an active streaming request", %{conn: conn} do
{:ok, conn, ref} = HTTP1.request(conn, "GET", "/", [], :stream)

send_window = HTTP1.next_body_chunk_size(conn, ref)
assert is_integer(send_window)
assert send_window > 0
end

test "returns the cached body_chunk_size value", %{conn: conn} do
{:ok, conn, ref} = HTTP1.request(conn, "GET", "/", [], :stream)
assert HTTP1.next_body_chunk_size(conn, ref) == conn.body_chunk_size
end

test "raises if no request is currently streaming a body", %{conn: conn} do
assert_raise ArgumentError, ~r/was not found or is not streaming a body/, fn ->
HTTP1.next_body_chunk_size(conn, make_ref())
end
end

test "raises if the ref does not match the active streaming request", %{conn: conn} do
{:ok, conn, _ref} = HTTP1.request(conn, "GET", "/", [], :stream)

assert_raise ArgumentError, ~r/was not found or is not streaming a body/, fn ->
HTTP1.next_body_chunk_size(conn, make_ref())
end
end
end

describe "next_body_chunk/3" do
test "iteratively drains a binary body across multiple calls", %{conn: conn} do
{:ok, conn, ref} = HTTP1.request(conn, "GET", "/", [], :stream)
body = :binary.copy(<<?a>>, conn.body_chunk_size * 3 + 17)

chunks = drain_body(conn, ref, body, [])
assert IO.iodata_to_binary(chunks) == body
assert length(chunks) == 4
end
end

defp drain_body(_conn, _ref, "", chunks), do: Enum.reverse(chunks)

defp drain_body(conn, ref, body, chunks) do
{chunk, rest} = Mint.HTTP.next_body_chunk(conn, ref, body)
drain_body(conn, ref, rest, [chunk | chunks])
end

@mint_user_agent "mint/#{Mix.Project.config()[:version]}"
defp mint_user_agent, do: @mint_user_agent
end
29 changes: 29 additions & 0 deletions test/mint/http2/conn_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1772,6 +1772,35 @@ defmodule Mint.HTTP2Test do
HTTP2.get_window_size(conn, {:request, make_ref()})
end
end

test "next_body_chunk_size/2 returns the minimum of connection and request window sizes",
%{conn: conn} do
{conn, ref} = open_request(conn, :stream)

send_window = HTTP2.next_body_chunk_size(conn, ref)
conn_window = HTTP2.get_window_size(conn, :connection)
request_window = HTTP2.get_window_size(conn, {:request, ref})

assert send_window == min(conn_window, request_window)
end

test "next_body_chunk_size/2 decreases after streaming body data", %{conn: conn} do
{conn, ref} = open_request(conn, :stream)

initial_send_window = HTTP2.next_body_chunk_size(conn, ref)
assert initial_send_window > 0

body_chunk = "hello"
{:ok, conn} = HTTP2.stream_request_body(conn, ref, body_chunk)

assert HTTP2.next_body_chunk_size(conn, ref) == initial_send_window - byte_size(body_chunk)
end

test "next_body_chunk_size/2 raises if the request is not found", %{conn: conn} do
assert_raise ArgumentError, ~r/request with request reference .+ was not found/, fn ->
HTTP2.next_body_chunk_size(conn, make_ref())
end
end
end

describe "settings" do
Expand Down
Loading