From 54e79d54887bb27d040b90aad9e03e2e0a64b250 Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Fri, 20 Mar 2026 15:41:58 +1300 Subject: [PATCH 1/3] feat: add streaming response support for async requests Modify the C port to optionally stream response data as it arrives rather than buffering the entire response in memory. When stream => true is passed in async request opts, the port sends three message types: {headers, From, {Status, Headers, CookieJar}} - status + headers {chunk, From, Data} - body chunk {done, From, Metrics} - transfer complete The Erlang worker forwards these as: {katipo_response, Ref, {status, Status, Headers, CookieJar}} {katipo_data, Ref, Chunk} {katipo_done, Ref} Non-streaming async and sync paths are unchanged. Errors use the existing {katipo_error, Ref, ErrorMap} format in both modes. Passing stream => true to the sync API (req/2, get/2, etc.) returns a bad_opts error since streaming requires the async message-based API. Co-Authored-By: Claude Opus 4.6 (1M context) --- c_src/katipo.c | 104 ++++++++++- src/katipo.erl | 397 ++++++++++++++++++++++++++++++++---------- test/katipo_SUITE.erl | 203 +++++++++++++++++++++ 3 files changed, 606 insertions(+), 98 deletions(-) diff --git a/c_src/katipo.c b/c_src/katipo.c index b56410b..c70d3b0 100644 --- a/c_src/katipo.c +++ b/c_src/katipo.c @@ -51,6 +51,7 @@ #define K_CURLOPT_DNS_CACHE_TIMEOUT 31 #define K_CURLOPT_CA_CACHE_TIMEOUT 32 #define K_CURLOPT_PIPEWAIT 33 +#define K_CURLOPT_STREAMING 34 #define K_CURLAUTH_BASIC 100 #define K_CURLAUTH_DIGEST 101 @@ -87,6 +88,8 @@ typedef struct _ConnInfo { long response_code; char *post_data; long post_data_size; + int streaming; + int headers_sent; // metrics double total_time; double namelookup_time; @@ -136,6 +139,7 @@ typedef struct _EasyOpts { long curlopt_dns_cache_timeout; long curlopt_ca_cache_timeout; long curlopt_pipewait; + long curlopt_streaming; } EasyOpts; static const char *curl_error_code(CURLcode error) { @@ -555,6 +559,71 @@ static void send_error_to_erlang(CURLcode curl_code, ConnInfo *conn) { ei_x_free(&result); } +static void send_stream_headers_to_erlang(ConnInfo *conn) { + ei_x_buff result; + + curl_easy_getinfo(conn->easy, CURLINFO_RESPONSE_CODE, &conn->response_code); + + if (ei_x_new_with_version(&result) || + ei_x_encode_tuple_header(&result, 3) || + ei_x_encode_atom(&result, "headers") || + + ei_x_encode_tuple_header(&result, 2) || + ei_x_encode_pid(&result, conn->pid) || + ei_x_encode_ref(&result, conn->ref) || + + ei_x_encode_tuple_header(&result, 3) || + ei_x_encode_long(&result, conn->response_code) || + ei_x_encode_list_header(&result, conn->num_headers)) { + errx(2, "Failed to encode stream headers"); + } + + encode_headers(&result, conn); + encode_cookies(&result, conn); + + send_to_erlang(result.buff, result.index); + ei_x_free(&result); + conn->headers_sent = 1; +} + +static void send_stream_chunk_to_erlang(ConnInfo *conn, void *data, size_t len) { + ei_x_buff result; + + if (ei_x_new_with_version(&result) || + ei_x_encode_tuple_header(&result, 3) || + ei_x_encode_atom(&result, "chunk") || + + ei_x_encode_tuple_header(&result, 2) || + ei_x_encode_pid(&result, conn->pid) || + ei_x_encode_ref(&result, conn->ref) || + + ei_x_encode_binary(&result, data, len)) { + errx(2, "Failed to encode stream chunk"); + } + + send_to_erlang(result.buff, result.index); + ei_x_free(&result); +} + +static void send_stream_done_to_erlang(ConnInfo *conn) { + ei_x_buff result; + + if (ei_x_new_with_version(&result) || + ei_x_encode_tuple_header(&result, 3) || + ei_x_encode_atom(&result, "done") || + + ei_x_encode_tuple_header(&result, 2) || + ei_x_encode_pid(&result, conn->pid) || + ei_x_encode_ref(&result, conn->ref)) { + errx(2, "Failed to encode stream done"); + } + + encode_metrics(&result, conn); + + send_to_erlang(result.buff, result.index); + ei_x_free(&result); +} + static void check_multi_info(GlobalInfo *global) { CURLMsg *msg; int msgs_left; @@ -577,7 +646,14 @@ static void check_multi_info(GlobalInfo *global) { curl_easy_getinfo(easy, CURLINFO_STARTTRANSFER_TIME, &conn->starttransfer_time); if (res == CURLE_OK) { - send_ok_to_erlang(conn); + if (conn->streaming) { + if (!conn->headers_sent) { + send_stream_headers_to_erlang(conn); + } + send_stream_done_to_erlang(conn); + } else { + send_ok_to_erlang(conn); + } } else { send_error_to_erlang(res, conn); } @@ -690,15 +766,22 @@ static int multi_timer_cb(CURLM *multi, long timeout_ms, void *userp) { static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data) { size_t realsize = size * nmemb; ConnInfo *conn = (ConnInfo *)data; - char *new_memory; - new_memory = (char *)realloc(conn->memory, conn->size + realsize); - if (new_memory == NULL) { - return 0; + if (conn->streaming) { + if (!conn->headers_sent) { + send_stream_headers_to_erlang(conn); + } + send_stream_chunk_to_erlang(conn, ptr, realsize); + } else { + char *new_memory; + new_memory = (char *)realloc(conn->memory, conn->size + realsize); + if (new_memory == NULL) { + return 0; + } + conn->memory = new_memory; + memcpy(&(conn->memory[conn->size]), ptr, realsize); + conn->size += realsize; } - conn->memory = new_memory; - memcpy(&(conn->memory[conn->size]), ptr, realsize); - conn->size += realsize; return realsize; } @@ -783,6 +866,8 @@ static void new_conn(long method, char *url, struct curl_slist *req_headers, conn->size = 0; conn->num_headers = 0; conn->resp_headers = NULL; + conn->streaming = eopts.curlopt_streaming; + conn->headers_sent = 0; conn->easy = curl_easy_init(); if (!conn->easy) { @@ -1172,6 +1257,9 @@ static void erl_input(struct bufferevent *ev, void *arg) { case K_CURLOPT_PIPEWAIT: eopts.curlopt_pipewait = eopt_long; break; + case K_CURLOPT_STREAMING: + eopts.curlopt_streaming = eopt_long; + break; default: errx(2, "Unknown eopt long value %ld", eopt); } diff --git a/src/katipo.erl b/src/katipo.erl index 31409b9..1f26eed 100644 --- a/src/katipo.erl +++ b/src/katipo.erl @@ -55,6 +55,24 @@ All request functions return `t:response/0`: -export([delete/2]). -export([delete/3]). +-export([async_req/2]). +-export([async_get/2]). +-export([async_get/3]). +-export([async_post/2]). +-export([async_post/3]). +-export([async_put/2]). +-export([async_put/3]). +-export([async_head/2]). +-export([async_head/3]). +-export([async_options/2]). +-export([async_options/3]). +-export([async_patch/2]). +-export([async_patch/3]). +-export([async_delete/2]). +-export([async_delete/3]). +-export([await/1]). +-export([await/2]). + -export([check_opts/1]). %% only for mocking during tests @@ -139,6 +157,7 @@ All request functions return `t:response/0`: -define(DNS_CACHE_TIMEOUT, 31). -define(CA_CACHE_TIMEOUT, 32). -define(PIPEWAIT, 33). +-define(STREAMING, 34). -define(DEFAULT_REQ_TIMEOUT, 30000). -define(FOLLOWLOCATION_TRUE, 1). @@ -288,7 +307,8 @@ All request functions return `t:response/0`: ech_required | curl_last | %% returned by us, not curl - bad_opts. + bad_opts | + await_timeout. -type curlmopt() :: %% curlmopt_chunk_length_penalty_size | @@ -342,6 +362,8 @@ All request functions return `t:response/0`: %% See [https://curl.haxx.se/libcurl/c/CURLOPT_USERPWD.html] -type request() :: #{url := binary(), method := method(), + reply_to => pid(), + stream => boolean(), headers => headers(), cookiejar => cookiejar(), body => req_body(), @@ -371,7 +393,9 @@ All request functions return `t:response/0`: dns_cache_timeout => integer(), ca_cache_timeout => integer(), pipewait => boolean()}. --type opts() :: #{headers => headers(), +-type opts() :: #{reply_to => pid(), + stream => boolean(), + headers => headers(), cookiejar => cookiejar(), body => req_body(), connecttimeout_ms => connecttimeout_ms(), @@ -409,6 +433,9 @@ All request functions return `t:response/0`: metrics => proplists:proplist()}} | {error, #{code := error_code(), message := error_msg()}}. +-type async_response() :: {ok, reference()} | + {error, #{code := error_code(), + message := error_msg()}}. -type http_auth() :: basic | digest | ntlm | negotiate. -type http_auth_int() :: ?CURLAUTH_UNDEFINED | ?CURLAUTH_BASIC | @@ -465,6 +492,7 @@ All request functions return `t:response/0`: -export_type([sslkey/0]). -export_type([sslkey_blob/0]). -export_type([userpwd/0]). +-export_type([async_response/0]). -record(req, { method = ?GET :: method_int(), @@ -499,7 +527,8 @@ All request functions return `t:response/0`: userpwd = undefined :: undefined | binary(), dns_cache_timeout = 60 :: integer(), ca_cache_timeout = 86400 :: integer(), - pipewait = ?PIPEWAIT_TRUE :: ?PIPEWAIT_FALSE | ?PIPEWAIT_TRUE + pipewait = ?PIPEWAIT_TRUE :: ?PIPEWAIT_FALSE | ?PIPEWAIT_TRUE, + streaming = 0 :: 0 | 1 }). -type req() :: #req{}. @@ -596,6 +625,76 @@ delete(PoolName, Url) -> delete(PoolName, Url, Opts) -> req(PoolName, Opts#{url => Url, method => delete}). +-doc #{equiv => async_get/3}. +-spec async_get(katipo_pool:name(), url()) -> async_response(). +async_get(PoolName, Url) -> + async_req(PoolName, #{url => Url, method => get}). + +-doc "Performs an async HTTP GET request. Returns `{ok, Ref}` immediately. The response is delivered as a `{katipo_response, Ref, Response}` message.". +-spec async_get(katipo_pool:name(), url(), opts()) -> async_response(). +async_get(PoolName, Url, Opts) -> + async_req(PoolName, Opts#{url => Url, method => get}). + +-doc #{equiv => async_post/3}. +-spec async_post(katipo_pool:name(), url()) -> async_response(). +async_post(PoolName, Url) -> + async_req(PoolName, #{url => Url, method => post}). + +-doc "Performs an async HTTP POST request. Returns `{ok, Ref}` immediately.". +-spec async_post(katipo_pool:name(), url(), opts()) -> async_response(). +async_post(PoolName, Url, Opts) -> + async_req(PoolName, Opts#{url => Url, method => post}). + +-doc #{equiv => async_put/3}. +-spec async_put(katipo_pool:name(), url()) -> async_response(). +async_put(PoolName, Url) -> + async_req(PoolName, #{url => Url, method => put}). + +-doc "Performs an async HTTP PUT request. Returns `{ok, Ref}` immediately.". +-spec async_put(katipo_pool:name(), url(), opts()) -> async_response(). +async_put(PoolName, Url, Opts) -> + async_req(PoolName, Opts#{url => Url, method => put}). + +-doc #{equiv => async_head/3}. +-spec async_head(katipo_pool:name(), url()) -> async_response(). +async_head(PoolName, Url) -> + async_req(PoolName, #{url => Url, method => head}). + +-doc "Performs an async HTTP HEAD request. Returns `{ok, Ref}` immediately.". +-spec async_head(katipo_pool:name(), url(), opts()) -> async_response(). +async_head(PoolName, Url, Opts) -> + async_req(PoolName, Opts#{url => Url, method => head}). + +-doc #{equiv => async_options/3}. +-spec async_options(katipo_pool:name(), url()) -> async_response(). +async_options(PoolName, Url) -> + async_req(PoolName, #{url => Url, method => options}). + +-doc "Performs an async HTTP OPTIONS request. Returns `{ok, Ref}` immediately.". +-spec async_options(katipo_pool:name(), url(), opts()) -> async_response(). +async_options(PoolName, Url, Opts) -> + async_req(PoolName, Opts#{url => Url, method => options}). + +-doc #{equiv => async_patch/3}. +-spec async_patch(katipo_pool:name(), url()) -> async_response(). +async_patch(PoolName, Url) -> + async_req(PoolName, #{url => Url, method => patch}). + +-doc "Performs an async HTTP PATCH request. Returns `{ok, Ref}` immediately.". +-spec async_patch(katipo_pool:name(), url(), opts()) -> async_response(). +async_patch(PoolName, Url, Opts) -> + async_req(PoolName, Opts#{url => Url, method => patch}). + +-doc #{equiv => async_delete/3}. +-spec async_delete(katipo_pool:name(), url()) -> async_response(). +async_delete(PoolName, Url) -> + async_req(PoolName, #{url => Url, method => delete}). + +-doc "Performs an async HTTP DELETE request. Returns `{ok, Ref}` immediately.". +-spec async_delete(katipo_pool:name(), url(), opts()) -> async_response(). +async_delete(PoolName, Url, Opts) -> + async_req(PoolName, Opts#{url => Url, method => delete}). + -doc "Performs an HTTP request using the full request map.". -spec req(katipo_pool:name(), request()) -> response(). req(PoolName, Opts) @@ -603,12 +702,65 @@ req(PoolName, Opts) case process_opts(Opts) of {ok, #req{url = undefined}} -> {error, error_map(bad_opts, <<"[{url,undefined}]">>)}; + {ok, #req{streaming = 1}} -> + {error, error_map(bad_opts, <<"stream requires async_req">>)}; {ok, Req} -> do_req_with_span(PoolName, Req); {error, _} = Error -> Error end. +-doc """ +Performs an async HTTP request using the full request map. + +Returns `{ok, Ref}` immediately. The response is delivered as a +`{katipo_response, Ref, ResponseMap}` or `{katipo_error, Ref, ErrorMap}` +message to the process specified by the `reply_to` option (defaults to `self()`). + +Use `await/1,2` to block until the response arrives. +""". +-spec async_req(katipo_pool:name(), request()) -> async_response(). +async_req(PoolName, Opts) + when is_map(Opts) -> + ReplyTo = maps:get(reply_to, Opts, self()), + Opts2 = maps:remove(reply_to, Opts), + case process_opts(Opts2) of + {ok, #req{url = undefined}} -> + {error, error_map(bad_opts, <<"[{url,undefined}]">>)}; + {ok, Req} -> + UserRef = make_ref(), + Timeout = ?MODULE:get_timeout(Req), + Req2 = Req#req{timeout = Timeout}, + wpool:cast(PoolName, {async_req, ReplyTo, UserRef, Req2}, random_worker), + {ok, UserRef}; + {error, _} = Error -> + Error + end. + +-doc #{equiv => await/2}. +-spec await(reference()) -> response(). +await(Ref) -> + await(Ref, ?DEFAULT_REQ_TIMEOUT). + +-doc "Blocks until an async response for `Ref` arrives or the timeout expires.". +-spec await(reference(), timeout()) -> response(). +await(Ref, Timeout) -> + receive + {katipo_response, Ref, Response} -> + {ok, Response}; + {katipo_error, Ref, Error} -> + {error, Error} + after Timeout -> + %% Flush any late-arriving response for this Ref + receive + {katipo_response, Ref, _} -> ok; + {katipo_error, Ref, _} -> ok + after 0 -> + ok + end, + {error, #{code => await_timeout, message => <<>>}} + end. + -doc false. do_req_with_span(PoolName, Req) -> #req{method = MethodInt, url = Url} = Req, @@ -705,111 +857,57 @@ init([CurlOpts]) -> end. -doc false. -handle_call(#req{method = Method, - url = Url, - headers = Headers, - cookiejar = CookieJar, - body = Body, - connecttimeout_ms = ConnTimeoutMs, - followlocation = FollowLocation, - ssl_verifyhost = SslVerifyHost, - ssl_verifypeer = SslVerifyPeer, - capath = CAPath, - cacert = CACert, - timeout_ms = TimeoutMs, - maxredirs = MaxRedirs, - timeout = Timeout, - http_auth = HTTPAuth, - username = Username, - password = Password, - proxy = Proxy, - tcp_fastopen = TCPFastOpen, - interface = Interface, - unix_socket_path = UnixSocketPath, - doh_url = DOHURL, - http_version = HTTPVersion, - sslversion = SSLVersion, - verbose = Verbose, - sslcert = SSLCert, - sslkey = SSLKey, - sslkey_blob = SSLKeyBlob, - keypasswd = KeyPasswd, - userpwd = UserPwd, - dns_cache_timeout = DNSCacheTimeout, - ca_cache_timeout = CACacheTimeout, - pipewait = Pipewait}, - From, - State = #state{port = Port, reqs = Reqs}) -> - {Self, Ref} = From, - Opts = [{?CONNECTTIMEOUT_MS, ConnTimeoutMs}, - {?FOLLOWLOCATION, FollowLocation}, - {?SSL_VERIFYHOST, SslVerifyHost}, - {?SSL_VERIFYPEER, SslVerifyPeer}, - {?CAPATH, CAPath}, - {?CACERT, CACert}, - {?TIMEOUT_MS, TimeoutMs}, - {?MAXREDIRS, MaxRedirs}, - {?HTTP_AUTH, HTTPAuth}, - {?USERNAME, Username}, - {?PASSWORD, Password}, - {?PROXY, Proxy}, - {?TCP_FASTOPEN, TCPFastOpen}, - {?INTERFACE, Interface}, - {?UNIX_SOCKET_PATH, UnixSocketPath}, - {?DOH_URL, DOHURL}, - {?HTTP_VERSION, HTTPVersion}, - {?SSLVERSION, SSLVersion}, - {?VERBOSE, Verbose}, - {?SSLCERT, SSLCert}, - {?SSLKEY, SSLKey}, - {?SSLKEY_BLOB, SSLKeyBlob}, - {?KEYPASSWD, KeyPasswd}, - {?USERPWD, UserPwd}, - {?DNS_CACHE_TIMEOUT, DNSCacheTimeout}, - {?CA_CACHE_TIMEOUT, CACacheTimeout}, - {?PIPEWAIT, Pipewait}], - Command = {Self, Ref, Method, Url, Headers, CookieJar, Body, Opts}, - true = port_command(Port, term_to_binary(Command)), +handle_call(Req = #req{timeout = Timeout}, From, State = #state{port = Port, reqs = Reqs}) -> + send_to_port(Port, From, Req), Tref = erlang:start_timer(Timeout, self(), {req_timeout, From}), Reqs2 = Reqs#{From => Tref}, {noreply, State#state{reqs = Reqs2}}. -doc false. +handle_cast({async_req, ReplyTo, UserRef, Req = #req{timeout = Timeout}}, + State = #state{port = Port, reqs = Reqs}) -> + InternalFrom = {self(), make_ref()}, + send_to_port(Port, InternalFrom, Req), + Tref = erlang:start_timer(Timeout, self(), {req_timeout, InternalFrom}), + Reqs2 = Reqs#{InternalFrom => {Tref, {async, ReplyTo, UserRef}}}, + {noreply, State#state{reqs = Reqs2}}; handle_cast(Msg, State) -> logger:error("Unexpected cast: ~p", [Msg]), {noreply, State}. -doc false. handle_info({Port, {data, Data}}, State = #state{port = Port, reqs = Reqs}) -> - {Result, {From, Response}} = - case binary_to_term(Data) of - {ok, {From0, {Status, Headers, CookieJar, Body, Metrics}}} -> - R = #{status => Status, - headers => parse_headers(Headers), - cookiejar => CookieJar, - body => Body}, - {ok, {From0, {R, Metrics}}}; - {error, {From0, {Code, Message, Metrics}}} -> - Error = #{code => Code, message => Message}, - {error, {From0, {Error, Metrics}}} - end, - case maps:find(From, Reqs) of - {ok, Tref} -> - _ = erlang:cancel_timer(Tref), - _ = gen_server:reply(From, {Result, Response}); - error -> - ok - end, - Reqs2 = maps:remove(From, Reqs), - {noreply, State#state{reqs = Reqs2}}; + case binary_to_term(Data) of + {ok, {From, {Status, Headers, CookieJar, Body, Metrics}}} -> + R = #{status => Status, + headers => parse_headers(Headers), + cookiejar => CookieJar, + body => Body}, + handle_complete_response(ok, From, {R, Metrics}, Reqs, State); + {error, {From, {Code, Message, Metrics}}} -> + Error = #{code => Code, message => Message}, + handle_complete_response(error, From, {Error, Metrics}, Reqs, State); + {headers, From, {Status, RawHeaders, CookieJar}} -> + handle_stream_headers(From, Status, RawHeaders, CookieJar, Reqs, State); + {chunk, From, Chunk} -> + handle_stream_chunk(From, Chunk, Reqs, State); + {done, From, Metrics} -> + handle_stream_done(From, Metrics, Reqs, State) + end; handle_info({timeout, Tref, {req_timeout, From}}, State = #state{reqs = Reqs}) -> Reqs2 = case maps:find(From, Reqs) of - {ok, Tref} -> + {ok, Tref} when is_reference(Tref) -> + %% Sync path Error = #{code => operation_timedout, message => <<>>}, Metrics = [], _ = gen_server:reply(From, {error, {Error, Metrics}}), maps:remove(From, Reqs); + {ok, {Tref, {async, ReplyTo, UserRef}}} -> + %% Async path + Error = #{code => operation_timedout, message => <<>>}, + ReplyTo ! {katipo_error, UserRef, Error}, + maps:remove(From, Reqs); error -> Reqs end, @@ -827,6 +925,119 @@ terminate(_Reason, #state{port = Port}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +handle_complete_response(Result, From, Response, Reqs, State) -> + case maps:find(From, Reqs) of + {ok, Tref} when is_reference(Tref) -> + _ = erlang:cancel_timer(Tref), + _ = gen_server:reply(From, {Result, Response}); + {ok, {Tref, {async, ReplyTo, UserRef}}} -> + {ResponseMap, _Metrics} = Response, + _ = erlang:cancel_timer(Tref), + case Result of + ok -> ReplyTo ! {katipo_response, UserRef, ResponseMap}; + error -> ReplyTo ! {katipo_error, UserRef, ResponseMap} + end; + error -> + ok + end, + Reqs2 = maps:remove(From, Reqs), + {noreply, State#state{reqs = Reqs2}}. + +handle_stream_headers(From, Status, RawHeaders, CookieJar, Reqs, State) -> + case maps:find(From, Reqs) of + {ok, {_Tref, {async, ReplyTo, UserRef}}} -> + Headers = parse_headers(RawHeaders), + ReplyTo ! {katipo_response, UserRef, {status, Status, Headers, CookieJar}}; + _ -> + ok + end, + {noreply, State}. + +handle_stream_chunk(From, Chunk, Reqs, State) -> + case maps:find(From, Reqs) of + {ok, {_Tref, {async, ReplyTo, UserRef}}} -> + ReplyTo ! {katipo_data, UserRef, Chunk}; + _ -> + ok + end, + {noreply, State}. + +handle_stream_done(From, _Metrics, Reqs, State) -> + case maps:find(From, Reqs) of + {ok, {Tref, {async, ReplyTo, UserRef}}} -> + _ = erlang:cancel_timer(Tref), + ReplyTo ! {katipo_done, UserRef}; + _ -> + ok + end, + Reqs2 = maps:remove(From, Reqs), + {noreply, State#state{reqs = Reqs2}}. + +send_to_port(Port, {Self, Ref}, + #req{method = Method, + url = Url, + headers = Headers, + cookiejar = CookieJar, + body = Body, + connecttimeout_ms = ConnTimeoutMs, + followlocation = FollowLocation, + ssl_verifyhost = SslVerifyHost, + ssl_verifypeer = SslVerifyPeer, + capath = CAPath, + cacert = CACert, + timeout_ms = TimeoutMs, + maxredirs = MaxRedirs, + http_auth = HTTPAuth, + username = Username, + password = Password, + proxy = Proxy, + tcp_fastopen = TCPFastOpen, + interface = Interface, + unix_socket_path = UnixSocketPath, + doh_url = DOHURL, + http_version = HTTPVersion, + sslversion = SSLVersion, + verbose = Verbose, + sslcert = SSLCert, + sslkey = SSLKey, + sslkey_blob = SSLKeyBlob, + keypasswd = KeyPasswd, + userpwd = UserPwd, + dns_cache_timeout = DNSCacheTimeout, + ca_cache_timeout = CACacheTimeout, + pipewait = Pipewait, + streaming = Streaming}) -> + Opts = [{?CONNECTTIMEOUT_MS, ConnTimeoutMs}, + {?FOLLOWLOCATION, FollowLocation}, + {?SSL_VERIFYHOST, SslVerifyHost}, + {?SSL_VERIFYPEER, SslVerifyPeer}, + {?CAPATH, CAPath}, + {?CACERT, CACert}, + {?TIMEOUT_MS, TimeoutMs}, + {?MAXREDIRS, MaxRedirs}, + {?HTTP_AUTH, HTTPAuth}, + {?USERNAME, Username}, + {?PASSWORD, Password}, + {?PROXY, Proxy}, + {?TCP_FASTOPEN, TCPFastOpen}, + {?INTERFACE, Interface}, + {?UNIX_SOCKET_PATH, UnixSocketPath}, + {?DOH_URL, DOHURL}, + {?HTTP_VERSION, HTTPVersion}, + {?SSLVERSION, SSLVersion}, + {?VERBOSE, Verbose}, + {?SSLCERT, SSLCert}, + {?SSLKEY, SSLKey}, + {?SSLKEY_BLOB, SSLKeyBlob}, + {?KEYPASSWD, KeyPasswd}, + {?USERPWD, UserPwd}, + {?DNS_CACHE_TIMEOUT, DNSCacheTimeout}, + {?CA_CACHE_TIMEOUT, CACacheTimeout}, + {?PIPEWAIT, Pipewait}, + {?STREAMING, Streaming}], + Command = {Self, Ref, Method, Url, Headers, CookieJar, Body, Opts}, + true = port_command(Port, term_to_binary(Command)). + -spec headers_to_binary(headers()) -> [binary()]. headers_to_binary(Headers) -> [iolist_to_binary([K, <<": ">>, V]) || {K, V} <- Headers]. @@ -1036,6 +1247,12 @@ opt(pipewait, true, {Req, Errors}) -> {Req#req{pipewait = ?PIPEWAIT_TRUE}, Errors}; opt(pipewait, false, {Req, Errors}) -> {Req#req{pipewait = ?PIPEWAIT_FALSE}, Errors}; +opt(reply_to, Pid, {Req, Errors}) when is_pid(Pid) -> + {Req, Errors}; +opt(stream, true, {Req, Errors}) -> + {Req#req{streaming = 1}, Errors}; +opt(stream, false, {Req, Errors}) -> + {Req#req{streaming = 0}, Errors}; opt(K, V, {Req, Errors}) -> {Req, [{K, V} | Errors]}. diff --git a/test/katipo_SUITE.erl b/test/katipo_SUITE.erl index 114b79c..3f48190 100644 --- a/test/katipo_SUITE.erl +++ b/test/katipo_SUITE.erl @@ -83,6 +83,10 @@ init_per_group(digest, Config) -> {req_opts, #{http_version => curl_http_version_1_1, ssl_verifyhost => false, ssl_verifypeer => false}}] ++ Config; +init_per_group(async, Config) -> + [{httpbin_base, <<"https://localhost:8443">>}, + {req_opts, #{ssl_verifyhost => false, + ssl_verifypeer => false}}] ++ Config; init_per_group(_, Config) -> Config. @@ -195,6 +199,23 @@ groups() -> [badssl_client_cert]}, {port, [], [max_total_connections]}, + {async, [parallel], + [async_get, + async_get_with_opts, + async_post, + async_req, + async_reply_to, + async_error, + async_timeout, + async_await, + async_await_timeout, + async_await_explicit_timeout, + async_await_own_timeout, + async_multiple_outstanding, + async_stream_get, + async_stream_headers_before_data, + async_stream_error, + sync_stream_rejected]}, {otel, [], [otel_span_created, otel_metrics_recorded, @@ -217,6 +238,7 @@ all() -> {group, pool}, {group, https_mutual}, {group, port}, + {group, async}, {group, otel}], %% HTTP/2 tests always run (local httpbin supports HTTP/2) Http2Groups = [{group, http2}], @@ -1174,3 +1196,184 @@ otel_url_sanitization(_Config) -> ?assertEqual(<<"example.com">>, Host6), ok. + +%% Async API tests + +async_get(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/get">>), + {ok, Ref} = katipo:async_get(?POOL, Url, Opts), + receive + {katipo_response, Ref, #{status := 200}} -> ok + after 10000 -> + ct:fail(timeout) + end. + +async_get_with_opts(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/get?a=1">>), + {ok, Ref} = katipo:async_get(?POOL, Url, Opts), + receive + {katipo_response, Ref, #{status := 200, body := Body}} -> + Json = jsx:decode(Body), + ?assertEqual(<<"1">>, maps:get(<<"a">>, maps:get(<<"args">>, Json))) + after 10000 -> + ct:fail(timeout) + end. + +async_post(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/post">>), + {ok, Ref} = katipo:async_post(?POOL, Url, + Opts#{headers => [{<<"Content-Type">>, <<"application/json">>}], + body => <<"hello">>}), + receive + {katipo_response, Ref, #{status := 200, body := Body}} -> + Json = jsx:decode(Body), + ?assertEqual(<<"hello">>, maps:get(<<"data">>, Json)) + after 10000 -> + ct:fail(timeout) + end. + +async_req(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/get">>), + {ok, Ref} = katipo:async_req(?POOL, Opts#{url => Url, method => get}), + receive + {katipo_response, Ref, #{status := 200}} -> ok + after 10000 -> + ct:fail(timeout) + end. + +async_reply_to(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/get">>), + Self = self(), + Pid = spawn_link(fun() -> + receive + {katipo_response, _Ref, #{status := 200}} -> + Self ! async_reply_to_ok + after 10000 -> + Self ! async_reply_to_fail + end + end), + {ok, _Ref} = katipo:async_get(?POOL, Url, Opts#{reply_to => Pid}), + receive + async_reply_to_ok -> ok; + async_reply_to_fail -> ct:fail(reply_to_timeout) + after 15000 -> + ct:fail(timeout) + end. + +async_error(_Config) -> + {error, #{code := bad_opts}} = + katipo:async_get(?POOL, <<"https://localhost">>, #{bad_option => bad_value}). + +async_timeout(_Config) -> + {ok, Ref} = katipo:async_get(?POOL, <<"http://google.com">>, + #{connecttimeout_ms => 1}), + receive + {katipo_error, Ref, #{code := operation_timedout}} -> ok + after 10000 -> + ct:fail(timeout) + end. + +async_await(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/get">>), + {ok, Ref} = katipo:async_get(?POOL, Url, Opts), + {ok, #{status := 200}} = katipo:await(Ref). + +async_await_timeout(_Config) -> + %% Request itself times out — await collects the error + {ok, Ref} = katipo:async_get(?POOL, <<"http://google.com">>, + #{connecttimeout_ms => 1}), + {error, #{code := operation_timedout}} = katipo:await(Ref). + +async_await_explicit_timeout(Config) -> + %% await/2 with an explicit timeout that is long enough + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/get">>), + {ok, Ref} = katipo:async_get(?POOL, Url, Opts), + {ok, #{status := 200}} = katipo:await(Ref, 10000). + +async_await_own_timeout(_Config) -> + %% await/2 timeout fires before the response arrives + {ok, Ref} = katipo:async_get(?POOL, <<"http://google.com">>, + #{timeout_ms => 30000, connecttimeout_ms => 30000}), + {error, #{code := await_timeout}} = katipo:await(Ref, 1). + +async_multiple_outstanding(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Urls = [httpbin_url(Config, <<"/get?n=", (integer_to_binary(N))/binary>>) + || N <- lists:seq(1, 5)], + Refs = [{N, begin + {ok, Ref} = katipo:async_get(?POOL, Url, Opts), + Ref + end} + || {N, Url} <- lists:zip(lists:seq(1, 5), Urls)], + Results = [{N, katipo:await(Ref)} || {N, Ref} <- Refs], + lists:foreach(fun({N, {ok, #{status := 200, body := Body}}}) -> + Json = jsx:decode(Body), + Expected = integer_to_binary(N), + ?assertEqual(Expected, maps:get(<<"n">>, maps:get(<<"args">>, Json))) + end, Results). + +async_stream_get(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/get?a=1">>), + {ok, Ref} = katipo:async_get(?POOL, Url, Opts#{stream => true}), + %% Collect status + headers + {Status, Headers, CookieJar} = receive + {katipo_response, Ref, {status, S, H, C}} -> {S, H, C} + after 10000 -> ct:fail(no_headers) + end, + ?assertEqual(200, Status), + ?assert(is_list(Headers)), + ?assert(is_list(CookieJar)), + %% Collect body chunks + Body = collect_stream_body(Ref), + Json = jsx:decode(Body), + ?assertEqual(<<"1">>, maps:get(<<"a">>, maps:get(<<"args">>, Json))). + +async_stream_headers_before_data(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/bytes/1024">>), + {ok, Ref} = katipo:async_get(?POOL, Url, Opts#{stream => true}), + %% First message must be headers, not data + receive + {katipo_response, Ref, {status, 200, _Headers, _CookieJar}} -> ok; + {katipo_data, Ref, _} -> ct:fail(data_before_headers) + after 10000 -> + ct:fail(timeout) + end, + %% Drain remaining messages + _ = collect_stream_body(Ref). + +async_stream_error(_Config) -> + {ok, Ref} = katipo:async_get(?POOL, <<"https://localhost">>, + #{stream => true, connecttimeout_ms => 1}), + receive + {katipo_error, Ref, #{code := _}} -> ok + after 10000 -> + ct:fail(timeout) + end. + +collect_stream_body(Ref) -> + collect_stream_body(Ref, []). + +collect_stream_body(Ref, Acc) -> + receive + {katipo_data, Ref, Chunk} -> + collect_stream_body(Ref, [Chunk | Acc]); + {katipo_done, Ref} -> + iolist_to_binary(lists:reverse(Acc)) + after 10000 -> + ct:fail(stream_timeout) + end. + +sync_stream_rejected(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/get">>), + {error, #{code := bad_opts, message := <<"stream requires async_req">>}} = + katipo:get(?POOL, Url, Opts#{stream => true}). From 2d9edee139b65a9f1756c9a1141492ec3ddb5166 Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Sun, 22 Mar 2026 11:20:04 +1300 Subject: [PATCH 2/3] feat: add streaming request body support and opaque async handle Introduce stream_body option for incrementally uploading request bodies via send_body/2 and finish_body/1. Uses curl's CURLOPT_READFUNCTION with pause/unpause to integrate with the existing libevent loop. Replace raw reference() return from async_* functions with an opaque async_handle() that embeds the worker name (for body chunk routing) and a monitor ref (for worker death detection in await/1,2). The worker monitors the caller process for streaming body requests and auto-finishes the upload if the caller dies, preventing hangs. Also includes metrics in async responses, matching sync behavior. --- c_src/katipo.c | 188 ++++++++++++++++++++++++++++++--- src/katipo.erl | 228 +++++++++++++++++++++++++++++++--------- test/katipo_SUITE.erl | 236 ++++++++++++++++++++++++++++++++++++------ 3 files changed, 563 insertions(+), 89 deletions(-) diff --git a/c_src/katipo.c b/c_src/katipo.c index c70d3b0..cf4e462 100644 --- a/c_src/katipo.c +++ b/c_src/katipo.c @@ -52,6 +52,7 @@ #define K_CURLOPT_CA_CACHE_TIMEOUT 32 #define K_CURLOPT_PIPEWAIT 33 #define K_CURLOPT_STREAMING 34 +#define K_CURLOPT_STREAM_BODY 35 #define K_CURLAUTH_BASIC 100 #define K_CURLAUTH_DIGEST 101 @@ -62,6 +63,8 @@ struct bufferevent *to_erlang; struct bufferevent *from_erlang; +typedef struct _ConnInfo ConnInfo; + typedef struct _GlobalInfo { struct event_base *evbase; struct event *timer_event; @@ -70,9 +73,10 @@ typedef struct _GlobalInfo { int still_running; size_t to_get; curl_version_info_data *ver; + ConnInfo *active_conns; } GlobalInfo; -typedef struct _ConnInfo { +struct _ConnInfo { CURL *easy; char *url; erlang_pid *pid; @@ -90,6 +94,13 @@ typedef struct _ConnInfo { long post_data_size; int streaming; int headers_sent; + /* streaming request body fields */ + int stream_body; + int body_finished; + char *upload_buf; + size_t upload_size; + size_t upload_sent; + ConnInfo *next; // metrics double total_time; double namelookup_time; @@ -98,7 +109,7 @@ typedef struct _ConnInfo { double pretransfer_time; double redirect_time; double starttransfer_time; -} ConnInfo; +}; typedef struct _SockInfo { curl_socket_t sockfd; @@ -140,6 +151,7 @@ typedef struct _EasyOpts { long curlopt_ca_cache_timeout; long curlopt_pipewait; long curlopt_streaming; + long curlopt_stream_body; } EasyOpts; static const char *curl_error_code(CURLcode error) { @@ -659,11 +671,24 @@ static void check_multi_info(GlobalInfo *global) { } curl_multi_remove_handle(global->multi, easy); + + if (conn->stream_body) { + ConnInfo **pp = &global->active_conns; + while (*pp) { + if (*pp == conn) { + *pp = conn->next; + break; + } + pp = &(*pp)->next; + } + } + free(conn->url); free(conn->pid); free(conn->ref); free(conn->memory); free(conn->post_data); + free(conn->upload_buf); curl_slist_free_all(conn->req_cookies); curl_slist_free_all(conn->req_headers); curl_slist_free_all(conn->resp_headers); @@ -817,24 +842,60 @@ static size_t header_cb(void *ptr, size_t size, size_t nmemb, void *data) { return realsize; } +static size_t upload_read_cb(char *buffer, size_t size, size_t nitems, void *userdata) { + ConnInfo *conn = (ConnInfo *)userdata; + size_t available = conn->upload_size - conn->upload_sent; + if (available == 0) { + if (conn->body_finished) return 0; /* EOF */ + return CURL_READFUNC_PAUSE; /* no data yet, pause */ + } + size_t to_copy = (available < size * nitems) ? available : size * nitems; + memcpy(buffer, conn->upload_buf + conn->upload_sent, to_copy); + conn->upload_sent += to_copy; + /* Compact buffer when fully consumed */ + if (conn->upload_sent == conn->upload_size) { + conn->upload_sent = 0; + conn->upload_size = 0; + } + return to_copy; +} + static void set_method(long method, ConnInfo *conn) { switch (method) { case KATIPO_GET: break; case KATIPO_POST: curl_easy_setopt(conn->easy, CURLOPT_POST, 1L); - curl_easy_setopt(conn->easy, CURLOPT_POSTFIELDS, conn->post_data); - curl_easy_setopt(conn->easy, CURLOPT_POSTFIELDSIZE, conn->post_data_size); + if (conn->stream_body) { + curl_easy_setopt(conn->easy, CURLOPT_READFUNCTION, upload_read_cb); + curl_easy_setopt(conn->easy, CURLOPT_READDATA, conn); + curl_easy_setopt(conn->easy, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t)-1); + } else { + curl_easy_setopt(conn->easy, CURLOPT_POSTFIELDS, conn->post_data); + curl_easy_setopt(conn->easy, CURLOPT_POSTFIELDSIZE, conn->post_data_size); + } break; case KATIPO_PUT: - curl_easy_setopt(conn->easy, CURLOPT_CUSTOMREQUEST, "PUT"); - curl_easy_setopt(conn->easy, CURLOPT_POSTFIELDS, conn->post_data); - curl_easy_setopt(conn->easy, CURLOPT_POSTFIELDSIZE, conn->post_data_size); + if (conn->stream_body) { + curl_easy_setopt(conn->easy, CURLOPT_UPLOAD, 1L); + curl_easy_setopt(conn->easy, CURLOPT_READFUNCTION, upload_read_cb); + curl_easy_setopt(conn->easy, CURLOPT_READDATA, conn); + } else { + curl_easy_setopt(conn->easy, CURLOPT_CUSTOMREQUEST, "PUT"); + curl_easy_setopt(conn->easy, CURLOPT_POSTFIELDS, conn->post_data); + curl_easy_setopt(conn->easy, CURLOPT_POSTFIELDSIZE, conn->post_data_size); + } break; case KATIPO_PATCH: curl_easy_setopt(conn->easy, CURLOPT_CUSTOMREQUEST, "PATCH"); - curl_easy_setopt(conn->easy, CURLOPT_POSTFIELDS, conn->post_data); - curl_easy_setopt(conn->easy, CURLOPT_POSTFIELDSIZE, conn->post_data_size); + if (conn->stream_body) { + curl_easy_setopt(conn->easy, CURLOPT_UPLOAD, 1L); + curl_easy_setopt(conn->easy, CURLOPT_READFUNCTION, upload_read_cb); + curl_easy_setopt(conn->easy, CURLOPT_READDATA, conn); + } else { + curl_easy_setopt(conn->easy, CURLOPT_POSTFIELDS, conn->post_data); + curl_easy_setopt(conn->easy, CURLOPT_POSTFIELDSIZE, conn->post_data_size); + } break; case KATIPO_HEAD: curl_easy_setopt(conn->easy, CURLOPT_CUSTOMREQUEST, "HEAD"); @@ -868,6 +929,12 @@ static void new_conn(long method, char *url, struct curl_slist *req_headers, conn->resp_headers = NULL; conn->streaming = eopts.curlopt_streaming; conn->headers_sent = 0; + conn->stream_body = eopts.curlopt_stream_body; + conn->body_finished = 0; + conn->upload_buf = NULL; + conn->upload_size = 0; + conn->upload_sent = 0; + conn->next = NULL; conn->easy = curl_easy_init(); if (!conn->easy) { @@ -1033,10 +1100,85 @@ static void new_conn(long method, char *url, struct curl_slist *req_headers, free(eopts.curlopt_userpwd); set_method(method, conn); + + if (conn->stream_body) { + conn->next = global->active_conns; + global->active_conns = conn; + } + rc = curl_multi_add_handle(global->multi, conn->easy); mcode_or_die("new_conn: curl_multi_add_handle", rc); } +static int refs_equal(erlang_ref *a, erlang_ref *b) { + if (a->len != b->len || a->creation != b->creation) return 0; + if (strcmp(a->node, b->node) != 0) return 0; + for (int i = 0; i < a->len; i++) { + if (a->n[i] != b->n[i]) return 0; + } + return 1; +} + +static ConnInfo *find_conn(GlobalInfo *global, erlang_ref *ref) { + ConnInfo *conn = global->active_conns; + while (conn) { + if (refs_equal(conn->ref, ref)) return conn; + conn = conn->next; + } + return NULL; +} + +static void handle_body_message(char *buf, int index, GlobalInfo *global) { + char atom[MAXATOMLEN]; + erlang_ref bref; + int erl_type, size; + long sizel; + + if (ei_decode_atom(buf, &index, atom)) { + errx(2, "Couldn't decode body message atom"); + } + + /* Skip pid (not needed for lookup), decode ref for ConnInfo lookup */ + if (ei_skip_term(buf, &index) || + ei_decode_ref(buf, &index, &bref)) { + errx(2, "Couldn't decode body message pid/ref"); + } + + ConnInfo *conn = find_conn(global, &bref); + if (!conn) return; + + if (strcmp(atom, "body_chunk") == 0) { + if (ei_get_type(buf, &index, &erl_type, &size)) { + errx(2, "Couldn't read body chunk size"); + } + char *chunk_data = (char *)malloc(size); + if (ei_decode_binary(buf, &index, chunk_data, &sizel)) { + errx(2, "Couldn't read body chunk data"); + } + + /* Ignore chunks arriving after body_done to avoid confusing curl */ + if (conn->body_finished) { + free(chunk_data); + return; + } + + char *new_buf = (char *)realloc(conn->upload_buf, conn->upload_size + (size_t)sizel); + if (!new_buf) { + free(chunk_data); + errx(2, "realloc failed for upload buffer"); + } + conn->upload_buf = new_buf; + memcpy(conn->upload_buf + conn->upload_size, chunk_data, (size_t)sizel); + conn->upload_size += (size_t)sizel; + free(chunk_data); + + curl_easy_pause(conn->easy, CURLPAUSE_CONT); + } else if (strcmp(atom, "body_done") == 0) { + conn->body_finished = 1; + curl_easy_pause(conn->easy, CURLPAUSE_CONT); + } +} + static void erl_input(struct bufferevent *ev, void *arg) { u_int32_t len; size_t data_read; @@ -1095,11 +1237,29 @@ static void erl_input(struct bufferevent *ev, void *arg) { index = 0; + if (ei_decode_version(buf, &index, &version) || + ei_decode_tuple_header(buf, &index, &arity)) { + errx(2, "Couldn't read message header"); + } + + /* Peek at first element to distinguish request vs body message */ + int first_type, first_size; + if (ei_get_type(buf, &index, &first_type, &first_size)) { + errx(2, "Couldn't peek at first element"); + } + + /* Atom = body control message (body_chunk or body_done) */ + if (first_type == ERL_ATOM_EXT || first_type == ERL_ATOM_UTF8_EXT || + first_type == ERL_SMALL_ATOM_UTF8_EXT) { + handle_body_message(buf, index, global); + free(buf); + continue; + } + + /* Otherwise it's a request message starting with a pid */ pid = malloc(sizeof(*pid)); ref = malloc(sizeof(*ref)); - if (ei_decode_version(buf, &index, &version) || - ei_decode_tuple_header(buf, &index, &arity) || - ei_decode_pid(buf, &index, pid) || + if (ei_decode_pid(buf, &index, pid) || ei_decode_ref(buf, &index, ref) || ei_decode_long(buf, &index, &method) || ei_get_type(buf, &index, &erl_type, &size)) { @@ -1260,6 +1420,9 @@ static void erl_input(struct bufferevent *ev, void *arg) { case K_CURLOPT_STREAMING: eopts.curlopt_streaming = eopt_long; break; + case K_CURLOPT_STREAM_BODY: + eopts.curlopt_stream_body = eopt_long; + break; default: errx(2, "Unknown eopt long value %ld", eopt); } @@ -1418,6 +1581,7 @@ int main(int argc, char **argv) { } global.timer_event = evtimer_new(global.evbase, timer_cb, &global); global.to_get = 0; + global.active_conns = NULL; ver = curl_version_info(CURLVERSION_NOW); global.ver = ver; diff --git a/src/katipo.erl b/src/katipo.erl index 1f26eed..829c848 100644 --- a/src/katipo.erl +++ b/src/katipo.erl @@ -72,6 +72,9 @@ All request functions return `t:response/0`: -export([async_delete/3]). -export([await/1]). -export([await/2]). +-export([ref/1]). +-export([send_body/2]). +-export([finish_body/1]). -export([check_opts/1]). @@ -120,7 +123,13 @@ All request functions return `t:response/0`: -endif. -record(state, {port :: port(), - reqs = #{} :: map()}). + reqs = #{} :: map(), + user_refs = #{} :: map(), + caller_monitors = #{} :: map()}). + +-record(async_handle, {ref :: reference(), + worker :: atom(), + monitor :: reference()}). -define(GET, 0). -define(POST, 1). @@ -158,6 +167,7 @@ All request functions return `t:response/0`: -define(CA_CACHE_TIMEOUT, 32). -define(PIPEWAIT, 33). -define(STREAMING, 34). +-define(STREAM_BODY, 35). -define(DEFAULT_REQ_TIMEOUT, 30000). -define(FOLLOWLOCATION_TRUE, 1). @@ -308,7 +318,8 @@ All request functions return `t:response/0`: curl_last | %% returned by us, not curl bad_opts | - await_timeout. + await_timeout | + worker_died. -type curlmopt() :: %% curlmopt_chunk_length_penalty_size | @@ -392,7 +403,8 @@ All request functions return `t:response/0`: userpwd => userpwd(), dns_cache_timeout => integer(), ca_cache_timeout => integer(), - pipewait => boolean()}. + pipewait => boolean(), + stream_body => boolean()}. -type opts() :: #{reply_to => pid(), stream => boolean(), headers => headers(), @@ -423,7 +435,8 @@ All request functions return `t:response/0`: userpwd => userpwd(), dns_cache_timeout => integer(), ca_cache_timeout => integer(), - pipewait => boolean()}. + pipewait => boolean(), + stream_body => boolean()}. -export_type([opts/0]). -type metrics() :: proplists:proplist(). -type response() :: {ok, #{status := status(), @@ -433,7 +446,8 @@ All request functions return `t:response/0`: metrics => proplists:proplist()}} | {error, #{code := error_code(), message := error_msg()}}. --type async_response() :: {ok, reference()} | +-opaque async_handle() :: #async_handle{}. +-type async_response() :: {ok, async_handle()} | {error, #{code := error_code(), message := error_msg()}}. -type http_auth() :: basic | digest | ntlm | negotiate. @@ -493,6 +507,7 @@ All request functions return `t:response/0`: -export_type([sslkey_blob/0]). -export_type([userpwd/0]). -export_type([async_response/0]). +-export_type([async_handle/0]). -record(req, { method = ?GET :: method_int(), @@ -528,7 +543,8 @@ All request functions return `t:response/0`: dns_cache_timeout = 60 :: integer(), ca_cache_timeout = 86400 :: integer(), pipewait = ?PIPEWAIT_TRUE :: ?PIPEWAIT_FALSE | ?PIPEWAIT_TRUE, - streaming = 0 :: 0 | 1 + streaming = 0 :: 0 | 1, + stream_body = 0 :: 0 | 1 }). -type req() :: #req{}. @@ -630,7 +646,7 @@ delete(PoolName, Url, Opts) -> async_get(PoolName, Url) -> async_req(PoolName, #{url => Url, method => get}). --doc "Performs an async HTTP GET request. Returns `{ok, Ref}` immediately. The response is delivered as a `{katipo_response, Ref, Response}` message.". +-doc "Performs an async HTTP GET request. Returns `{ok, Handle}` immediately. The response is delivered as a `{katipo_response, Ref, Response}` message.". -spec async_get(katipo_pool:name(), url(), opts()) -> async_response(). async_get(PoolName, Url, Opts) -> async_req(PoolName, Opts#{url => Url, method => get}). @@ -640,7 +656,7 @@ async_get(PoolName, Url, Opts) -> async_post(PoolName, Url) -> async_req(PoolName, #{url => Url, method => post}). --doc "Performs an async HTTP POST request. Returns `{ok, Ref}` immediately.". +-doc "Performs an async HTTP POST request. Returns `{ok, Handle}` immediately.". -spec async_post(katipo_pool:name(), url(), opts()) -> async_response(). async_post(PoolName, Url, Opts) -> async_req(PoolName, Opts#{url => Url, method => post}). @@ -650,7 +666,7 @@ async_post(PoolName, Url, Opts) -> async_put(PoolName, Url) -> async_req(PoolName, #{url => Url, method => put}). --doc "Performs an async HTTP PUT request. Returns `{ok, Ref}` immediately.". +-doc "Performs an async HTTP PUT request. Returns `{ok, Handle}` immediately.". -spec async_put(katipo_pool:name(), url(), opts()) -> async_response(). async_put(PoolName, Url, Opts) -> async_req(PoolName, Opts#{url => Url, method => put}). @@ -660,7 +676,7 @@ async_put(PoolName, Url, Opts) -> async_head(PoolName, Url) -> async_req(PoolName, #{url => Url, method => head}). --doc "Performs an async HTTP HEAD request. Returns `{ok, Ref}` immediately.". +-doc "Performs an async HTTP HEAD request. Returns `{ok, Handle}` immediately.". -spec async_head(katipo_pool:name(), url(), opts()) -> async_response(). async_head(PoolName, Url, Opts) -> async_req(PoolName, Opts#{url => Url, method => head}). @@ -670,7 +686,7 @@ async_head(PoolName, Url, Opts) -> async_options(PoolName, Url) -> async_req(PoolName, #{url => Url, method => options}). --doc "Performs an async HTTP OPTIONS request. Returns `{ok, Ref}` immediately.". +-doc "Performs an async HTTP OPTIONS request. Returns `{ok, Handle}` immediately.". -spec async_options(katipo_pool:name(), url(), opts()) -> async_response(). async_options(PoolName, Url, Opts) -> async_req(PoolName, Opts#{url => Url, method => options}). @@ -680,7 +696,7 @@ async_options(PoolName, Url, Opts) -> async_patch(PoolName, Url) -> async_req(PoolName, #{url => Url, method => patch}). --doc "Performs an async HTTP PATCH request. Returns `{ok, Ref}` immediately.". +-doc "Performs an async HTTP PATCH request. Returns `{ok, Handle}` immediately.". -spec async_patch(katipo_pool:name(), url(), opts()) -> async_response(). async_patch(PoolName, Url, Opts) -> async_req(PoolName, Opts#{url => Url, method => patch}). @@ -690,7 +706,7 @@ async_patch(PoolName, Url, Opts) -> async_delete(PoolName, Url) -> async_req(PoolName, #{url => Url, method => delete}). --doc "Performs an async HTTP DELETE request. Returns `{ok, Ref}` immediately.". +-doc "Performs an async HTTP DELETE request. Returns `{ok, Handle}` immediately.". -spec async_delete(katipo_pool:name(), url(), opts()) -> async_response(). async_delete(PoolName, Url, Opts) -> async_req(PoolName, Opts#{url => Url, method => delete}). @@ -704,6 +720,8 @@ req(PoolName, Opts) {error, error_map(bad_opts, <<"[{url,undefined}]">>)}; {ok, #req{streaming = 1}} -> {error, error_map(bad_opts, <<"stream requires async_req">>)}; + {ok, #req{stream_body = 1}} -> + {error, error_map(bad_opts, <<"stream_body requires async_req">>)}; {ok, Req} -> do_req_with_span(PoolName, Req); {error, _} = Error -> @@ -713,11 +731,15 @@ req(PoolName, Opts) -doc """ Performs an async HTTP request using the full request map. -Returns `{ok, Ref}` immediately. The response is delivered as a -`{katipo_response, Ref, ResponseMap}` or `{katipo_error, Ref, ErrorMap}` -message to the process specified by the `reply_to` option (defaults to `self()`). +Returns `{ok, Handle}` immediately where `Handle` is an opaque `t:async_handle/0`. +The response is delivered as a `{katipo_response, Ref, ResponseMap}` or +`{katipo_error, Ref, ErrorMap}` message to the process specified by the `reply_to` +option (defaults to `self()`). Use `ref/1` to extract the `Ref` for matching. Use `await/1,2` to block until the response arrives. + +For streaming request bodies, pass `stream_body => true` and use +`send_body/2` and `finish_body/1` to send the body incrementally. """. -spec async_req(katipo_pool:name(), request()) -> async_response(). async_req(PoolName, Opts) @@ -731,20 +753,43 @@ async_req(PoolName, Opts) UserRef = make_ref(), Timeout = ?MODULE:get_timeout(Req), Req2 = Req#req{timeout = Timeout}, - wpool:cast(PoolName, {async_req, ReplyTo, UserRef, Req2}, random_worker), - {ok, UserRef}; + WorkerName = wpool_pool:random_worker(PoolName), + MonRef = erlang:monitor(process, WorkerName), + gen_server:cast(WorkerName, {async_req, ReplyTo, UserRef, Req2}), + {ok, #async_handle{ref = UserRef, worker = WorkerName, monitor = MonRef}}; {error, _} = Error -> Error end. -doc #{equiv => await/2}. --spec await(reference()) -> response(). -await(Ref) -> +-spec await(async_handle() | reference()) -> response(). +await(#async_handle{} = Handle) -> + await(Handle, ?DEFAULT_REQ_TIMEOUT); +await(Ref) when is_reference(Ref) -> await(Ref, ?DEFAULT_REQ_TIMEOUT). --doc "Blocks until an async response for `Ref` arrives or the timeout expires.". --spec await(reference(), timeout()) -> response(). -await(Ref, Timeout) -> +-doc "Blocks until an async response arrives or the timeout expires.". +-spec await(async_handle() | reference(), timeout()) -> response(). +await(#async_handle{ref = Ref, monitor = MonRef}, Timeout) -> + Result = receive + {katipo_response, Ref, Response} -> + {ok, Response}; + {katipo_error, Ref, Error} -> + {error, Error}; + {'DOWN', MonRef, process, _, _} -> + {error, #{code => worker_died, message => <<>>}} + after Timeout -> + receive + {katipo_response, Ref, _} -> ok; + {katipo_error, Ref, _} -> ok + after 0 -> + ok + end, + {error, #{code => await_timeout, message => <<>>}} + end, + erlang:demonitor(MonRef, [flush]), + Result; +await(Ref, Timeout) when is_reference(Ref) -> receive {katipo_response, Ref, Response} -> {ok, Response}; @@ -761,6 +806,22 @@ await(Ref, Timeout) -> {error, #{code => await_timeout, message => <<>>}} end. +-doc "Extracts the reference from an async handle for use in receive patterns.". +-spec ref(async_handle()) -> reference(). +ref(#async_handle{ref = Ref}) -> Ref. + +-doc "Sends a chunk of request body data for a streaming upload.". +-spec send_body(async_handle(), iodata()) -> ok. +send_body(#async_handle{worker = W, ref = Ref}, Data) -> + gen_server:cast(W, {body_chunk, Ref, iolist_to_binary(Data)}), + ok. + +-doc "Signals that the streaming request body is complete.". +-spec finish_body(async_handle()) -> ok. +finish_body(#async_handle{worker = W, ref = Ref}) -> + gen_server:cast(W, {body_done, Ref}), + ok. + -doc false. do_req_with_span(PoolName, Req) -> #req{method = MethodInt, url = Url} = Req, @@ -864,13 +925,40 @@ handle_call(Req = #req{timeout = Timeout}, From, State = #state{port = Port, req {noreply, State#state{reqs = Reqs2}}. -doc false. -handle_cast({async_req, ReplyTo, UserRef, Req = #req{timeout = Timeout}}, - State = #state{port = Port, reqs = Reqs}) -> +handle_cast({async_req, ReplyTo, UserRef, Req = #req{timeout = Timeout, stream_body = StreamBody}}, + State = #state{port = Port, reqs = Reqs, user_refs = URefs, + caller_monitors = CMs}) -> InternalFrom = {self(), make_ref()}, send_to_port(Port, InternalFrom, Req), Tref = erlang:start_timer(Timeout, self(), {req_timeout, InternalFrom}), Reqs2 = Reqs#{InternalFrom => {Tref, {async, ReplyTo, UserRef}}}, - {noreply, State#state{reqs = Reqs2}}; + URefs2 = URefs#{UserRef => InternalFrom}, + CMs2 = case StreamBody of + 1 -> + CallerMonRef = erlang:monitor(process, ReplyTo), + CMs#{UserRef => CallerMonRef}; + 0 -> + CMs + end, + {noreply, State#state{reqs = Reqs2, user_refs = URefs2, caller_monitors = CMs2}}; +handle_cast({body_chunk, UserRef, Data}, + State = #state{port = Port, user_refs = URefs}) -> + case maps:find(UserRef, URefs) of + {ok, {Pid, Ref}} -> + true = port_command(Port, term_to_binary({body_chunk, Pid, Ref, Data})); + error -> + ok + end, + {noreply, State}; +handle_cast({body_done, UserRef}, + State = #state{port = Port, user_refs = URefs}) -> + case maps:find(UserRef, URefs) of + {ok, {Pid, Ref}} -> + true = port_command(Port, term_to_binary({body_done, Pid, Ref})); + error -> + ok + end, + {noreply, State}; handle_cast(Msg, State) -> logger:error("Unexpected cast: ~p", [Msg]), {noreply, State}. @@ -894,24 +982,42 @@ handle_info({Port, {data, Data}}, State = #state{port = Port, reqs = Reqs}) -> {done, From, Metrics} -> handle_stream_done(From, Metrics, Reqs, State) end; -handle_info({timeout, Tref, {req_timeout, From}}, State = #state{reqs = Reqs}) -> - Reqs2 = +handle_info({timeout, Tref, {req_timeout, From}}, + State = #state{reqs = Reqs, user_refs = URefs, caller_monitors = CMs}) -> + {Reqs2, URefs2, CMs2} = case maps:find(From, Reqs) of {ok, Tref} when is_reference(Tref) -> %% Sync path Error = #{code => operation_timedout, message => <<>>}, Metrics = [], _ = gen_server:reply(From, {error, {Error, Metrics}}), - maps:remove(From, Reqs); + {maps:remove(From, Reqs), URefs, CMs}; {ok, {Tref, {async, ReplyTo, UserRef}}} -> %% Async path Error = #{code => operation_timedout, message => <<>>}, ReplyTo ! {katipo_error, UserRef, Error}, - maps:remove(From, Reqs); + {maps:remove(From, Reqs), maps:remove(UserRef, URefs), + demonitor_caller(UserRef, CMs)}; error -> - Reqs + {Reqs, URefs, CMs} end, - {noreply, State#state{reqs = Reqs2}}; + {noreply, State#state{reqs = Reqs2, user_refs = URefs2, caller_monitors = CMs2}}; +handle_info({'DOWN', MonRef, process, _, _}, + State = #state{port = Port, user_refs = URefs, caller_monitors = CMs}) -> + case find_by_value(MonRef, CMs) of + {ok, UserRef} -> + %% Caller died during streaming body upload — finish the body + %% so curl can complete (and timeout/error) promptly + case maps:find(UserRef, URefs) of + {ok, {Pid, Ref}} -> + true = port_command(Port, term_to_binary({body_done, Pid, Ref})); + error -> + ok + end, + {noreply, State#state{caller_monitors = maps:remove(UserRef, CMs)}}; + error -> + {noreply, State} + end; handle_info({'EXIT', Port, Reason}, State = #state{port = Port}) -> logger:error("Port ~p died with reason: ~p", [Port, Reason]), {stop, port_died, State}. @@ -925,23 +1031,26 @@ terminate(_Reason, #state{port = Port}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -handle_complete_response(Result, From, Response, Reqs, State) -> - case maps:find(From, Reqs) of +handle_complete_response(Result, From, Response, Reqs, + State = #state{user_refs = URefs, caller_monitors = CMs}) -> + {URefs2, CMs2} = case maps:find(From, Reqs) of {ok, Tref} when is_reference(Tref) -> _ = erlang:cancel_timer(Tref), - _ = gen_server:reply(From, {Result, Response}); + _ = gen_server:reply(From, {Result, Response}), + {URefs, CMs}; {ok, {Tref, {async, ReplyTo, UserRef}}} -> - {ResponseMap, _Metrics} = Response, + {ResponseMap, Metrics} = Response, _ = erlang:cancel_timer(Tref), case Result of - ok -> ReplyTo ! {katipo_response, UserRef, ResponseMap}; + ok -> ReplyTo ! {katipo_response, UserRef, ResponseMap#{metrics => Metrics}}; error -> ReplyTo ! {katipo_error, UserRef, ResponseMap} - end; + end, + {maps:remove(UserRef, URefs), demonitor_caller(UserRef, CMs)}; error -> - ok + {URefs, CMs} end, Reqs2 = maps:remove(From, Reqs), - {noreply, State#state{reqs = Reqs2}}. + {noreply, State#state{reqs = Reqs2, user_refs = URefs2, caller_monitors = CMs2}}. handle_stream_headers(From, Status, RawHeaders, CookieJar, Reqs, State) -> case maps:find(From, Reqs) of @@ -962,16 +1071,18 @@ handle_stream_chunk(From, Chunk, Reqs, State) -> end, {noreply, State}. -handle_stream_done(From, _Metrics, Reqs, State) -> - case maps:find(From, Reqs) of +handle_stream_done(From, _Metrics, Reqs, + State = #state{user_refs = URefs, caller_monitors = CMs}) -> + {URefs2, CMs2} = case maps:find(From, Reqs) of {ok, {Tref, {async, ReplyTo, UserRef}}} -> _ = erlang:cancel_timer(Tref), - ReplyTo ! {katipo_done, UserRef}; + ReplyTo ! {katipo_done, UserRef}, + {maps:remove(UserRef, URefs), demonitor_caller(UserRef, CMs)}; _ -> - ok + {URefs, CMs} end, Reqs2 = maps:remove(From, Reqs), - {noreply, State#state{reqs = Reqs2}}. + {noreply, State#state{reqs = Reqs2, user_refs = URefs2, caller_monitors = CMs2}}. send_to_port(Port, {Self, Ref}, #req{method = Method, @@ -1006,7 +1117,8 @@ send_to_port(Port, {Self, Ref}, dns_cache_timeout = DNSCacheTimeout, ca_cache_timeout = CACacheTimeout, pipewait = Pipewait, - streaming = Streaming}) -> + streaming = Streaming, + stream_body = StreamBody}) -> Opts = [{?CONNECTTIMEOUT_MS, ConnTimeoutMs}, {?FOLLOWLOCATION, FollowLocation}, {?SSL_VERIFYHOST, SslVerifyHost}, @@ -1034,7 +1146,8 @@ send_to_port(Port, {Self, Ref}, {?DNS_CACHE_TIMEOUT, DNSCacheTimeout}, {?CA_CACHE_TIMEOUT, CACacheTimeout}, {?PIPEWAIT, Pipewait}, - {?STREAMING, Streaming}], + {?STREAMING, Streaming}, + {?STREAM_BODY, StreamBody}], Command = {Self, Ref, Method, Url, Headers, CookieJar, Body, Opts}, true = port_command(Port, term_to_binary(Command)). @@ -1253,6 +1366,10 @@ opt(stream, true, {Req, Errors}) -> {Req#req{streaming = 1}, Errors}; opt(stream, false, {Req, Errors}) -> {Req#req{streaming = 0}, Errors}; +opt(stream_body, true, {Req, Errors}) -> + {Req#req{stream_body = 1}, Errors}; +opt(stream_body, false, {Req, Errors}) -> + {Req#req{stream_body = 0}, Errors}; opt(K, V, {Req, Errors}) -> {Req, [{K, V} | Errors]}. @@ -1275,6 +1392,23 @@ check_opts(Opts) when is_map(Opts) -> Error end. +demonitor_caller(UserRef, CMs) -> + case maps:take(UserRef, CMs) of + {MonRef, CMs2} -> + erlang:demonitor(MonRef, [flush]), + CMs2; + error -> + CMs + end. + +find_by_value(Val, Map) -> + maps:fold(fun(K, V, Acc) -> + case V of + Val -> {ok, K}; + _ -> Acc + end + end, error, Map). + error_map(Code, Message) when is_atom(Code) andalso is_binary(Message) -> #{code => Code, message => Message}; error_map(Code, Message) when is_atom(Code) -> diff --git a/test/katipo_SUITE.erl b/test/katipo_SUITE.erl index 3f48190..9088f7f 100644 --- a/test/katipo_SUITE.erl +++ b/test/katipo_SUITE.erl @@ -215,7 +215,18 @@ groups() -> async_stream_get, async_stream_headers_before_data, async_stream_error, - sync_stream_rejected]}, + async_metrics, + sync_stream_rejected, + stream_body_post, + stream_body_put, + stream_body_empty, + stream_body_caller_dies, + stream_body_worker_dies, + stream_body_double_finish, + stream_body_send_after_finish, + stream_body_with_response_stream, + stream_body_large, + sync_stream_body_rejected]}, {otel, [], [otel_span_created, otel_metrics_recorded, @@ -1202,7 +1213,8 @@ otel_url_sanitization(_Config) -> async_get(Config) -> {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), Url = httpbin_url(Config, <<"/get">>), - {ok, Ref} = katipo:async_get(?POOL, Url, Opts), + {ok, Handle} = katipo:async_get(?POOL, Url, Opts), + Ref = katipo:ref(Handle), receive {katipo_response, Ref, #{status := 200}} -> ok after 10000 -> @@ -1212,7 +1224,8 @@ async_get(Config) -> async_get_with_opts(Config) -> {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), Url = httpbin_url(Config, <<"/get?a=1">>), - {ok, Ref} = katipo:async_get(?POOL, Url, Opts), + {ok, Handle} = katipo:async_get(?POOL, Url, Opts), + Ref = katipo:ref(Handle), receive {katipo_response, Ref, #{status := 200, body := Body}} -> Json = jsx:decode(Body), @@ -1224,9 +1237,10 @@ async_get_with_opts(Config) -> async_post(Config) -> {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), Url = httpbin_url(Config, <<"/post">>), - {ok, Ref} = katipo:async_post(?POOL, Url, - Opts#{headers => [{<<"Content-Type">>, <<"application/json">>}], - body => <<"hello">>}), + {ok, Handle} = katipo:async_post(?POOL, Url, + Opts#{headers => [{<<"Content-Type">>, <<"application/json">>}], + body => <<"hello">>}), + Ref = katipo:ref(Handle), receive {katipo_response, Ref, #{status := 200, body := Body}} -> Json = jsx:decode(Body), @@ -1238,7 +1252,8 @@ async_post(Config) -> async_req(Config) -> {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), Url = httpbin_url(Config, <<"/get">>), - {ok, Ref} = katipo:async_req(?POOL, Opts#{url => Url, method => get}), + {ok, Handle} = katipo:async_req(?POOL, Opts#{url => Url, method => get}), + Ref = katipo:ref(Handle), receive {katipo_response, Ref, #{status := 200}} -> ok after 10000 -> @@ -1257,7 +1272,7 @@ async_reply_to(Config) -> Self ! async_reply_to_fail end end), - {ok, _Ref} = katipo:async_get(?POOL, Url, Opts#{reply_to => Pid}), + {ok, _Handle} = katipo:async_get(?POOL, Url, Opts#{reply_to => Pid}), receive async_reply_to_ok -> ok; async_reply_to_fail -> ct:fail(reply_to_timeout) @@ -1270,8 +1285,9 @@ async_error(_Config) -> katipo:async_get(?POOL, <<"https://localhost">>, #{bad_option => bad_value}). async_timeout(_Config) -> - {ok, Ref} = katipo:async_get(?POOL, <<"http://google.com">>, - #{connecttimeout_ms => 1}), + {ok, Handle} = katipo:async_get(?POOL, <<"http://google.com">>, + #{connecttimeout_ms => 1}), + Ref = katipo:ref(Handle), receive {katipo_error, Ref, #{code := operation_timedout}} -> ok after 10000 -> @@ -1281,38 +1297,38 @@ async_timeout(_Config) -> async_await(Config) -> {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), Url = httpbin_url(Config, <<"/get">>), - {ok, Ref} = katipo:async_get(?POOL, Url, Opts), - {ok, #{status := 200}} = katipo:await(Ref). + {ok, Handle} = katipo:async_get(?POOL, Url, Opts), + {ok, #{status := 200}} = katipo:await(Handle). async_await_timeout(_Config) -> %% Request itself times out — await collects the error - {ok, Ref} = katipo:async_get(?POOL, <<"http://google.com">>, - #{connecttimeout_ms => 1}), - {error, #{code := operation_timedout}} = katipo:await(Ref). + {ok, Handle} = katipo:async_get(?POOL, <<"http://google.com">>, + #{connecttimeout_ms => 1}), + {error, #{code := operation_timedout}} = katipo:await(Handle). async_await_explicit_timeout(Config) -> %% await/2 with an explicit timeout that is long enough {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), Url = httpbin_url(Config, <<"/get">>), - {ok, Ref} = katipo:async_get(?POOL, Url, Opts), - {ok, #{status := 200}} = katipo:await(Ref, 10000). + {ok, Handle} = katipo:async_get(?POOL, Url, Opts), + {ok, #{status := 200}} = katipo:await(Handle, 10000). async_await_own_timeout(_Config) -> %% await/2 timeout fires before the response arrives - {ok, Ref} = katipo:async_get(?POOL, <<"http://google.com">>, - #{timeout_ms => 30000, connecttimeout_ms => 30000}), - {error, #{code := await_timeout}} = katipo:await(Ref, 1). + {ok, Handle} = katipo:async_get(?POOL, <<"http://google.com">>, + #{timeout_ms => 30000, connecttimeout_ms => 30000}), + {error, #{code := await_timeout}} = katipo:await(Handle, 1). async_multiple_outstanding(Config) -> {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), Urls = [httpbin_url(Config, <<"/get?n=", (integer_to_binary(N))/binary>>) || N <- lists:seq(1, 5)], - Refs = [{N, begin - {ok, Ref} = katipo:async_get(?POOL, Url, Opts), - Ref - end} - || {N, Url} <- lists:zip(lists:seq(1, 5), Urls)], - Results = [{N, katipo:await(Ref)} || {N, Ref} <- Refs], + Handles = [{N, begin + {ok, Handle} = katipo:async_get(?POOL, Url, Opts), + Handle + end} + || {N, Url} <- lists:zip(lists:seq(1, 5), Urls)], + Results = [{N, katipo:await(Handle)} || {N, Handle} <- Handles], lists:foreach(fun({N, {ok, #{status := 200, body := Body}}}) -> Json = jsx:decode(Body), Expected = integer_to_binary(N), @@ -1322,7 +1338,8 @@ async_multiple_outstanding(Config) -> async_stream_get(Config) -> {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), Url = httpbin_url(Config, <<"/get?a=1">>), - {ok, Ref} = katipo:async_get(?POOL, Url, Opts#{stream => true}), + {ok, Handle} = katipo:async_get(?POOL, Url, Opts#{stream => true}), + Ref = katipo:ref(Handle), %% Collect status + headers {Status, Headers, CookieJar} = receive {katipo_response, Ref, {status, S, H, C}} -> {S, H, C} @@ -1339,7 +1356,8 @@ async_stream_get(Config) -> async_stream_headers_before_data(Config) -> {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), Url = httpbin_url(Config, <<"/bytes/1024">>), - {ok, Ref} = katipo:async_get(?POOL, Url, Opts#{stream => true}), + {ok, Handle} = katipo:async_get(?POOL, Url, Opts#{stream => true}), + Ref = katipo:ref(Handle), %% First message must be headers, not data receive {katipo_response, Ref, {status, 200, _Headers, _CookieJar}} -> ok; @@ -1351,8 +1369,9 @@ async_stream_headers_before_data(Config) -> _ = collect_stream_body(Ref). async_stream_error(_Config) -> - {ok, Ref} = katipo:async_get(?POOL, <<"https://localhost">>, - #{stream => true, connecttimeout_ms => 1}), + {ok, Handle} = katipo:async_get(?POOL, <<"https://localhost">>, + #{stream => true, connecttimeout_ms => 1}), + Ref = katipo:ref(Handle), receive {katipo_error, Ref, #{code := _}} -> ok after 10000 -> @@ -1372,8 +1391,165 @@ collect_stream_body(Ref, Acc) -> ct:fail(stream_timeout) end. +async_metrics(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/get">>), + {ok, Handle} = katipo:async_get(?POOL, Url, Opts), + {ok, #{status := 200, metrics := Metrics}} = katipo:await(Handle, 10000), + ?assert(is_list(Metrics)), + ?assert(length(Metrics) > 0), + {total_time, TotalTime} = lists:keyfind(total_time, 1, Metrics), + ?assert(is_float(TotalTime) andalso TotalTime > 0). + sync_stream_rejected(Config) -> {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), Url = httpbin_url(Config, <<"/get">>), {error, #{code := bad_opts, message := <<"stream requires async_req">>}} = katipo:get(?POOL, Url, Opts#{stream => true}). + +%% Streaming request body tests + +stream_body_post(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/post">>), + {ok, Handle} = katipo:async_post(?POOL, Url, + Opts#{headers => [{<<"Content-Type">>, <<"application/octet-stream">>}], + stream_body => true}), + ok = katipo:send_body(Handle, <<"chunk1">>), + ok = katipo:send_body(Handle, <<"chunk2">>), + ok = katipo:send_body(Handle, <<"chunk3">>), + ok = katipo:finish_body(Handle), + {ok, #{status := 200, body := Body}} = katipo:await(Handle, 10000), + Json = jsx:decode(Body), + ?assertEqual(<<"chunk1chunk2chunk3">>, maps:get(<<"data">>, Json)). + +stream_body_put(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/put">>), + {ok, Handle} = katipo:async_put(?POOL, Url, + Opts#{headers => [{<<"Content-Type">>, <<"application/octet-stream">>}], + stream_body => true}), + ok = katipo:send_body(Handle, <<"put_data">>), + ok = katipo:finish_body(Handle), + {ok, #{status := 200, body := Body}} = katipo:await(Handle, 10000), + Json = jsx:decode(Body), + ?assertEqual(<<"put_data">>, maps:get(<<"data">>, Json)). + +stream_body_empty(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/post">>), + {ok, Handle} = katipo:async_post(?POOL, Url, + Opts#{headers => [{<<"Content-Type">>, <<"application/octet-stream">>}], + stream_body => true}), + ok = katipo:finish_body(Handle), + {ok, #{status := 200, body := Body}} = katipo:await(Handle, 10000), + Json = jsx:decode(Body), + ?assertEqual(<<"">>, maps:get(<<"data">>, Json)). + +stream_body_caller_dies(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/post">>), + Self = self(), + %% Spawn a process that starts a streaming upload and dies mid-stream. + %% reply_to defaults to self() inside the spawned process, so the worker + %% monitors the spawned process. When it dies, the worker auto-finishes + %% the body so the curl request can complete. + Pid = spawn(fun() -> + {ok, Handle} = katipo:async_post(?POOL, Url, + Opts#{headers => [{<<"Content-Type">>, <<"application/octet-stream">>}], + stream_body => true}), + ok = katipo:send_body(Handle, <<"partial">>), + Self ! {started, Handle} + %% Process exits here without calling finish_body + end), + _Handle = receive {started, H} -> H after 5000 -> ct:fail(no_handle) end, + MRef = monitor(process, Pid), + receive {'DOWN', MRef, process, Pid, _Reason} -> ok + after 5000 -> ct:fail(caller_didnt_die) + end, + %% The worker should detect the caller death and auto-finish the body. + %% Give it a moment to process the DOWN and complete the curl request, + %% then verify the pool is still healthy (not stuck on the dead upload). + timer:sleep(500), + {ok, #{status := 200}} = katipo:get(?POOL, httpbin_url(Config, <<"/get">>), Opts). + +stream_body_worker_dies(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/post">>), + PoolName = stream_body_worker_dies_pool, + {ok, _} = katipo_pool:start(PoolName, 1), + {ok, Handle} = katipo:async_post(PoolName, Url, + Opts#{headers => [{<<"Content-Type">>, <<"application/octet-stream">>}], + stream_body => true}), + ok = katipo:send_body(Handle, <<"data">>), + %% Kill the only worker — the handle's monitor should fire + WorkerName = wpool_pool:random_worker(PoolName), + exit(whereis(WorkerName), kill), + {error, #{code := worker_died}} = katipo:await(Handle, 5000), + timer:sleep(500), + ok = katipo_pool:stop(PoolName). + +stream_body_double_finish(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/post">>), + {ok, Handle} = katipo:async_post(?POOL, Url, + Opts#{headers => [{<<"Content-Type">>, <<"application/octet-stream">>}], + stream_body => true}), + ok = katipo:send_body(Handle, <<"data">>), + ok = katipo:finish_body(Handle), + ok = katipo:finish_body(Handle), + {ok, #{status := 200, body := Body}} = katipo:await(Handle, 10000), + Json = jsx:decode(Body), + ?assertEqual(<<"data">>, maps:get(<<"data">>, Json)). + +stream_body_send_after_finish(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/post">>), + {ok, Handle} = katipo:async_post(?POOL, Url, + Opts#{headers => [{<<"Content-Type">>, <<"application/octet-stream">>}], + stream_body => true}), + ok = katipo:send_body(Handle, <<"data">>), + ok = katipo:finish_body(Handle), + %% Late send after finish — should be silently ignored + ok = katipo:send_body(Handle, <<"late_data">>), + {ok, #{status := 200, body := Body}} = katipo:await(Handle, 10000), + Json = jsx:decode(Body), + ?assertEqual(<<"data">>, maps:get(<<"data">>, Json)). + +stream_body_with_response_stream(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/post">>), + {ok, Handle} = katipo:async_post(?POOL, Url, + Opts#{headers => [{<<"Content-Type">>, <<"application/octet-stream">>}], + stream_body => true, + stream => true}), + ok = katipo:send_body(Handle, <<"streamed">>), + ok = katipo:finish_body(Handle), + Ref = katipo:ref(Handle), + receive + {katipo_response, Ref, {status, 200, _Headers, _CookieJar}} -> ok + after 10000 -> ct:fail(no_headers) + end, + Body = collect_stream_body(Ref), + Json = jsx:decode(Body), + ?assertEqual(<<"streamed">>, maps:get(<<"data">>, Json)). + +stream_body_large(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/post">>), + {ok, Handle} = katipo:async_post(?POOL, Url, + Opts#{headers => [{<<"Content-Type">>, <<"text/plain">>}], + stream_body => true}), + Chunk = binary:copy(<<"x">>, 4096), + lists:foreach(fun(_) -> ok = katipo:send_body(Handle, Chunk) end, lists:seq(1, 10)), + ok = katipo:finish_body(Handle), + {ok, #{status := 200, body := Body}} = katipo:await(Handle, 10000), + Json = jsx:decode(Body), + Expected = binary:copy(<<"x">>, 40960), + ?assertEqual(Expected, maps:get(<<"data">>, Json)). + +sync_stream_body_rejected(Config) -> + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + Url = httpbin_url(Config, <<"/post">>), + {error, #{code := bad_opts, message := <<"stream_body requires async_req">>}} = + katipo:post(?POOL, Url, Opts#{stream_body => true}). From 72a09deff9ae3b2d878eefde463a90bcedf26598 Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Sun, 22 Mar 2026 13:06:46 +1300 Subject: [PATCH 3/3] test: add PropEr statem tests for streaming body state machine Property-based stateful tests exercising random sequences of streaming uploads, non-streaming gets, timeouts, and recovery. The model tracks accumulated chunks per handle and verifies the server receives them in order with correct content. Covers interleaved streaming/non-streaming requests on the same pool, timeout-then-recovery (await before finish, then finish and re-await), and concurrent upload isolation across workers. --- rebar.config | 3 +- test/katipo_SUITE.erl | 9 ++ test/katipo_stream_body_statem.erl | 231 +++++++++++++++++++++++++++++ 3 files changed, 242 insertions(+), 1 deletion(-) create mode 100644 test/katipo_stream_body_statem.erl diff --git a/rebar.config b/rebar.config index 9217ea1..e9ddef9 100644 --- a/rebar.config +++ b/rebar.config @@ -29,7 +29,8 @@ {cowboy, "2.9.0"}, {ephemeral, "2.0.4"}, {opentelemetry, "1.5.0"}, - {opentelemetry_experimental, "0.5.1"} + {opentelemetry_experimental, "0.5.1"}, + {proper, "1.4.0"} ]}] }] }. diff --git a/test/katipo_SUITE.erl b/test/katipo_SUITE.erl index 9088f7f..e7ae995 100644 --- a/test/katipo_SUITE.erl +++ b/test/katipo_SUITE.erl @@ -226,6 +226,7 @@ groups() -> stream_body_send_after_finish, stream_body_with_response_stream, stream_body_large, + stream_body_statem, sync_stream_body_rejected]}, {otel, [], [otel_span_created, @@ -1548,6 +1549,14 @@ stream_body_large(Config) -> Expected = binary:copy(<<"x">>, 40960), ?assertEqual(Expected, maps:get(<<"data">>, Json)). +stream_body_statem(Config) -> + ct:timetrap({minutes, 3}), + {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), + BaseUrl = ?config(httpbin_base, Config), + true = proper:quickcheck( + katipo_stream_body_statem:prop_streaming(BaseUrl, Opts), + [{numtests, 50}, {max_size, 30}, {to_file, user}]). + sync_stream_body_rejected(Config) -> {req_opts, Opts} = lists:keyfind(req_opts, 1, Config), Url = httpbin_url(Config, <<"/post">>), diff --git a/test/katipo_stream_body_statem.erl b/test/katipo_stream_body_statem.erl new file mode 100644 index 0000000..56ee3f2 --- /dev/null +++ b/test/katipo_stream_body_statem.erl @@ -0,0 +1,231 @@ +-module(katipo_stream_body_statem). + +-behaviour(proper_statem). + +-include_lib("proper/include/proper.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-export([prop_streaming/2]). + +-export([initial_state/0, + command/1, + precondition/2, + postcondition/3, + next_state/3]). + +-export([start_upload/0, + start_get/0, + send_chunk/2, + finish_upload/1, + await_response/1, + await_uploading/1]). + +%% Generator for HTTP-safe chunk data (ASCII alphanumeric) +chunk_data() -> + ?LET(Chars, non_empty(list(oneof([range($a, $z), range($0, $9)]))), + list_to_binary(Chars)). + +-define(POOL, katipo_test_pool). + +-record(handle_info, { + kind :: streaming | get, + status :: uploading | finished | timed_out, + chunks = [] :: [binary()] %% in send order (streaming only) +}). + +-record(state, { + handles = #{} :: #{term() => #handle_info{}} +}). + +%% --- Property --- + +prop_streaming(BaseUrl, Opts) -> + put(stream_base_url, BaseUrl), + put(stream_opts, Opts), + ?FORALL(Cmds, commands(?MODULE), + begin + {_H, S, Res} = run_commands(?MODULE, Cmds), + cleanup(S), + ?WHENFAIL( + ct:pal("Commands: ~p~nFinal state: ~p~nResult: ~p", + [Cmds, S, Res]), + aggregate(command_names(Cmds), Res =:= ok)) + end). + +%% --- State machine callbacks --- + +initial_state() -> + #state{}. + +command(#state{handles = Handles}) -> + Uploading = [V || {V, #handle_info{kind = streaming, status = uploading}} + <- maps:to_list(Handles)], + Finished = [V || {V, #handle_info{status = finished}} + <- maps:to_list(Handles)], + TimedOut = [V || {V, #handle_info{kind = streaming, status = timed_out}} + <- maps:to_list(Handles)], + frequency( + [{4, {call, ?MODULE, start_upload, []}}] ++ + [{3, {call, ?MODULE, start_get, []}}] ++ + [{3, {call, ?MODULE, send_chunk, + [elements(Uploading), chunk_data()]}} + || Uploading =/= []] ++ + [{3, {call, ?MODULE, finish_upload, + [elements(Uploading)]}} + || Uploading =/= []] ++ + [{4, {call, ?MODULE, await_response, + [elements(Finished)]}} + || Finished =/= []] ++ + %% Timeout: await before body is finished + [{2, {call, ?MODULE, await_uploading, + [elements(Uploading)]}} + || Uploading =/= []] ++ + %% Recovery: finish a timed-out upload so it becomes awaitable + [{2, {call, ?MODULE, finish_upload, + [elements(TimedOut)]}} + || TimedOut =/= []] ++ + %% Edge cases: double finish, send after finish + [{1, {call, ?MODULE, finish_upload, + [elements(Finished)]}} + || Finished =/= []] ++ + [{1, {call, ?MODULE, send_chunk, + [elements(Finished), chunk_data()]}} + || Finished =/= []] + ). + +precondition(_, _) -> + true. + +next_state(S, V, {call, _, start_upload, []}) -> + Info = #handle_info{kind = streaming, status = uploading}, + S#state{handles = maps:put(V, Info, S#state.handles)}; +next_state(S, V, {call, _, start_get, []}) -> + Info = #handle_info{kind = get, status = finished}, + S#state{handles = maps:put(V, Info, S#state.handles)}; +next_state(S, _V, {call, _, finish_upload, [Handle]}) -> + case maps:get(Handle, S#state.handles, undefined) of + #handle_info{status = Status} = Info + when Status =:= uploading; Status =:= timed_out -> + S#state{handles = maps:put(Handle, + Info#handle_info{status = finished}, + S#state.handles)}; + _ -> + S + end; +next_state(S, _V, {call, _, await_response, [Handle]}) -> + S#state{handles = maps:remove(Handle, S#state.handles)}; +next_state(S, _V, {call, _, await_uploading, [Handle]}) -> + case maps:get(Handle, S#state.handles, undefined) of + #handle_info{} = Info -> + S#state{handles = maps:put(Handle, + Info#handle_info{status = timed_out}, + S#state.handles)}; + _ -> + S + end; +next_state(S, _V, {call, _, send_chunk, [Handle, Data]}) -> + case maps:get(Handle, S#state.handles, undefined) of + #handle_info{status = uploading, chunks = Chunks} = Info -> + S#state{handles = maps:put(Handle, + Info#handle_info{chunks = Chunks ++ [Data]}, + S#state.handles)}; + _ -> + S + end. + +%% --- Postconditions --- +%% +%% PropEr postconditions must return true | false (not crash), so we +%% use ct:pal for diagnostics rather than ct assertion macros. + +postcondition(_S, {call, _, start_upload, []}, {ok, _}) -> true; +postcondition(_S, {call, _, start_get, []}, {ok, _}) -> true; +postcondition(_S, {call, _, send_chunk, _}, ok) -> true; +postcondition(_S, {call, _, finish_upload, _}, ok) -> true; + +postcondition(_S, {call, _, await_uploading, _}, {error, #{code := await_timeout}}) -> + true; + +postcondition(S, {call, _, await_response, [Handle]}, + {ok, #{status := 200, metrics := Metrics} = Response}) -> + HasMetrics = is_list(Metrics) andalso length(Metrics) > 0, + case maps:get(Handle, S#state.handles, undefined) of + #handle_info{kind = get} -> + HasMetrics; + #handle_info{kind = streaming, chunks = Chunks} -> + Expected = iolist_to_binary(Chunks), + Body = maps:get(body, Response), + Json = jsx:decode(Body), + Actual = maps:get(<<"data">>, Json, <<>>), + case {HasMetrics, Actual =:= Expected} of + {true, true} -> + true; + _ -> + ct:pal("body mismatch or missing metrics~n" + " expected body: ~p~n" + " actual body: ~p~n" + " has_metrics: ~p", + [Expected, Actual, HasMetrics]), + false + end; + undefined -> + ct:pal("handle not found in model state"), + false + end; + +%% Catch-all failure clause with diagnostics +postcondition(_S, {call, _, Fn, _}, Other) -> + ct:pal("~p returned unexpected: ~p", [Fn, Other]), + false. + +%% --- Command implementations --- + +start_upload() -> + BaseUrl = get(stream_base_url), + Opts = get(stream_opts), + katipo:async_post(?POOL, <>, + Opts#{headers => [{<<"Content-Type">>, <<"text/plain">>}], + stream_body => true}). + +start_get() -> + BaseUrl = get(stream_base_url), + Opts = get(stream_opts), + katipo:async_get(?POOL, <>, Opts). + +send_chunk({ok, Handle}, Data) -> + katipo:send_body(Handle, Data). + +finish_upload({ok, Handle}) -> + katipo:finish_body(Handle). + +await_response({ok, Handle}) -> + katipo:await(Handle, 10000). + +await_uploading({ok, Handle}) -> + katipo:await(Handle, 1). + +%% --- Cleanup --- + +cleanup(#state{handles = Handles}) -> + maps:foreach(fun(HandleResult, #handle_info{kind = Kind, status = Status}) -> + case HandleResult of + {ok, Handle} -> + case {Kind, Status} of + {streaming, uploading} -> + katipo:finish_body(Handle), + katipo:await(Handle, 5000); + {streaming, finished} -> + katipo:await(Handle, 5000); + {streaming, timed_out} -> + katipo:finish_body(Handle), + katipo:await(Handle, 5000); + {get, finished} -> + katipo:await(Handle, 5000); + _ -> + ok + end; + _ -> + ok + end + end, Handles). +