Skip to content

Commit 1fc4346

Browse files
committed
feat: switch to Mint as the default HTTP producer
Mint uses a no-process architecture, which fits better with Concentrate's one-process-per-HTTP-endpoint model.
1 parent cc659da commit 1fc4346

5 files changed

Lines changed: 727 additions & 1 deletion

File tree

config/config.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,6 @@ config :concentrate,
7373
file_tap: [
7474
enabled?: false
7575
],
76-
http_producer: Concentrate.Producer.HTTPoison
76+
http_producer: Concentrate.Producer.Mint
7777

7878
import_config "#{Mix.env()}.exs"

lib/concentrate/producer/mint.ex

Lines changed: 374 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,374 @@
1+
defmodule Concentrate.Producer.Mint do
2+
@moduledoc """
3+
HTTP producer implementation using Mint.
4+
"""
5+
use GenStage
6+
require Logger
7+
alias Mint.HTTP
8+
@start_link_opts ~w(name)a
9+
@default_fetch_after 5_000
10+
@default_timeout 30_000
11+
@default_transport_opts [timeout: @default_timeout]
12+
@default_headers %{"accept-encoding" => "gzip"}
13+
14+
defmodule State do
15+
@moduledoc false
16+
17+
defstruct [
18+
:url,
19+
:parser,
20+
fetch_after: nil,
21+
opts: %{},
22+
transport_opts: [],
23+
headers: %{},
24+
conn: :not_connected,
25+
ref: :not_connected,
26+
demand: 0,
27+
response: nil,
28+
events: []
29+
]
30+
end
31+
32+
alias __MODULE__.State
33+
34+
def start_link({url, opts}) when is_binary(url) and is_list(opts) do
35+
{start_link_opts, opts} = Keyword.split(opts, @start_link_opts)
36+
GenStage.start_link(__MODULE__, {url, opts}, start_link_opts)
37+
end
38+
39+
@impl GenStage
40+
def init({url, opts}) do
41+
opts = Map.new(opts)
42+
43+
parser =
44+
case Map.fetch!(opts, :parser) do
45+
module when is_atom(module) ->
46+
&module.parse(&1, [])
47+
48+
{module, opts} when is_atom(module) and is_list(opts) ->
49+
&module.parse(&1, opts)
50+
51+
fun when is_function(fun, 1) ->
52+
fun
53+
end
54+
55+
state = %State{
56+
url: url,
57+
parser: parser,
58+
fetch_after: Map.get(opts, :fetch_after, @default_fetch_after),
59+
transport_opts:
60+
Keyword.take(Map.get(opts, :get_opts, @default_transport_opts), ~w(timeout send_timeout)a),
61+
headers: Map.merge(@default_headers, Map.get(opts, :headers, %{})),
62+
opts: opts
63+
}
64+
65+
{
66+
:producer,
67+
state,
68+
dispatcher: GenStage.BroadcastDispatcher
69+
}
70+
end
71+
72+
@impl GenStage
73+
def handle_demand(new_demand, %{demand: existing_demand} = state) do
74+
demand = new_demand + existing_demand
75+
state = %{state | demand: demand}
76+
77+
state =
78+
if existing_demand == 0 do
79+
make_request(state, state.url)
80+
else
81+
state
82+
end
83+
84+
{:noreply, [], state}
85+
end
86+
87+
@impl GenStage
88+
def handle_info({:fetch, url}, state) do
89+
state =
90+
if state.demand > 0 do
91+
make_request(state, url)
92+
else
93+
state
94+
end
95+
96+
{:noreply, [], state}
97+
end
98+
99+
def handle_info({:fetch_timeout, ref}, %{ref: ref} = state) do
100+
log_message(:warn, state, fn -> "fetch timed out, disconnecting" end)
101+
102+
_ =
103+
if state.conn != :not_connected and HTTP.open?(state.conn) do
104+
HTTP.close(state.conn)
105+
end
106+
107+
state = %{state | conn: :not_connected, ref: nil}
108+
state = fetch_again!(state)
109+
{:noreply, [], state}
110+
end
111+
112+
def handle_info({:fetch_timeout, _}, state) do
113+
{:noreply, [], state}
114+
end
115+
116+
def handle_info(message, state) do
117+
state =
118+
case HTTP.stream(state.conn, message) do
119+
{:ok, conn, responses} ->
120+
Enum.reduce(responses, %{state | conn: conn}, &handle_responses/2)
121+
122+
{:error, conn, error, _responses} ->
123+
log_message(:warn, state, fn ->
124+
"HTTP error error=#{inspect(error)}
125+
"
126+
end)
127+
128+
state = %{state | conn: conn, ref: nil}
129+
fetch_again!(state)
130+
131+
:unknown ->
132+
log_message(:warn, state, fn -> "unknown message message=#{inspect(message)}" end)
133+
state
134+
end
135+
136+
{:noreply, Enum.reverse(state.events), %{state | events: []}}
137+
end
138+
139+
def handle_responses({:status, ref, status}, %{ref: ref} = state) do
140+
%{state | response: {status, [], []}}
141+
end
142+
143+
def handle_responses({:headers, ref, headers}, %{ref: ref} = state) do
144+
{status, old_headers, body} = state.response
145+
146+
cache_headers =
147+
Enum.reduce(headers, state.headers, fn {header, value}, acc ->
148+
case String.downcase(header) do
149+
"last-modified" ->
150+
Map.put(acc, "if-modified-since", value)
151+
152+
"etag" ->
153+
Map.put(acc, "if-none-match", value)
154+
155+
_ ->
156+
acc
157+
end
158+
end)
159+
160+
# don't use if-none-match if we already have if-modified-since
161+
cache_headers =
162+
case cache_headers do
163+
%{"if-modified-since" => _, "if-none-match" => _} ->
164+
Map.delete(cache_headers, "if-none-match")
165+
166+
_ ->
167+
cache_headers
168+
end
169+
170+
%{state | response: {status, headers ++ old_headers, body}, headers: cache_headers}
171+
end
172+
173+
def handle_responses({:data, ref, data}, %{ref: ref} = state) do
174+
{status, headers, body} = state.response
175+
176+
%{state | response: {status, headers, [body | data]}}
177+
end
178+
179+
def handle_responses({:done, ref}, %{ref: ref} = state) do
180+
{status, headers, body} = state.response
181+
handle_http_response(state, status, headers, body)
182+
end
183+
184+
def handle_responses(response, state) do
185+
log_message(:warn, state, fn ->
186+
"unexpected response=#{inspect(response)}"
187+
end)
188+
189+
state
190+
end
191+
192+
def make_request(state, url) do
193+
state = connect(state, url)
194+
195+
if state.conn != :not_connected do
196+
{_, _, _, path} = parse_url(url)
197+
{:ok, conn, ref} = HTTP.request(state.conn, "GET", path, Map.to_list(state.headers), "")
198+
199+
Process.send_after(
200+
self(),
201+
{:fetch_timeout, ref},
202+
Keyword.get(state.transport_opts, :timeout)
203+
)
204+
205+
%{state | conn: conn, ref: ref}
206+
else
207+
fetch_again!(state)
208+
end
209+
end
210+
211+
defp connect(%{conn: :not_connected} = state, url) do
212+
{scheme, host, port, _} = parse_url(url)
213+
214+
case HTTP.connect(scheme, host, port,
215+
transport_opts: state.transport_opts,
216+
protocols: [:http1]
217+
) do
218+
{:ok, conn} ->
219+
%{state | conn: conn}
220+
221+
{:error, _} ->
222+
state
223+
end
224+
end
225+
226+
defp connect(state, url) do
227+
# if HTTP.open?(conn) do
228+
# state
229+
# else
230+
state
231+
|> disconnect()
232+
|> connect(url)
233+
234+
# end
235+
end
236+
237+
defp disconnect(%{conn: conn} = state) do
238+
_ = HTTP.close(conn)
239+
%{state | conn: :not_connected, ref: :not_connected}
240+
end
241+
242+
defp handle_http_response(state, 200, headers, body) do
243+
body = decode_body(body, find_header(headers, "content-encoding"))
244+
{time, parsed} = :timer.tc(state.parser, [body])
245+
246+
log_message(:info, state, fn ->
247+
"updated: records=#{length(parsed)} time=#{time / 1000}"
248+
end)
249+
250+
state = %{
251+
state
252+
| events: [parsed | state.events],
253+
demand: max(state.demand - 1, 0),
254+
response: nil
255+
}
256+
257+
fetch_again!(state)
258+
rescue
259+
error ->
260+
state = log_parse_error(error, state, System.stacktrace())
261+
fetch_again!(state)
262+
catch
263+
error ->
264+
state = log_parse_error(error, state, System.stacktrace())
265+
fetch_again!(state)
266+
end
267+
268+
defp handle_http_response(state, redirect, headers, _body) when redirect in [301, 302] do
269+
{:ok, location} = find_header(headers, "location")
270+
state = disconnect(state)
271+
272+
if redirect == 301 do
273+
state = %{state | url: location}
274+
fetch_again!(state, fetch_after: 0)
275+
else
276+
fetch_again!(state, url: location, fetch_after: 0)
277+
end
278+
end
279+
280+
defp handle_http_response(state, 304, _headers, _body) do
281+
log_message(:info, state, fn -> "not modified status=304" end)
282+
fetch_again!(state)
283+
end
284+
285+
defp handle_http_response(state, 404, _headers, _body) do
286+
log_message(:warn, state, fn -> "not found status=404" end)
287+
fetch_again!(state)
288+
end
289+
290+
defp decode_body(body, {:ok, "gzip"}) do
291+
:zlib.gunzip(body)
292+
end
293+
294+
defp decode_body(body, _) do
295+
IO.iodata_to_binary(body)
296+
end
297+
298+
defp fetch_again!(state, opts \\ []) do
299+
_ =
300+
if state.demand > 0 do
301+
url = Keyword.get(opts, :url, state.url)
302+
fetch_after = Keyword.get(opts, :fetch_after, state.fetch_after)
303+
Process.send_after(self(), {:fetch, url}, fetch_after)
304+
end
305+
306+
state
307+
end
308+
309+
def find_header(headers, query) do
310+
value =
311+
Enum.find_value(headers, fn {header, value} ->
312+
if String.downcase(header) == query do
313+
value
314+
else
315+
nil
316+
end
317+
end)
318+
319+
if value do
320+
{:ok, value}
321+
else
322+
:error
323+
end
324+
end
325+
326+
defp log_parse_error(error, machine, trace) do
327+
_ =
328+
Logger.error(fn ->
329+
"#{__MODULE__}: parse error url=#{machine.url} error=#{inspect(error)}\n#{
330+
Exception.format_stacktrace(trace)
331+
}"
332+
end)
333+
334+
machine
335+
end
336+
337+
@doc """
338+
Parse URL into the pieces needed for connecting to Mint.
339+
340+
iex> parse_url("https://mbta.com/developers")
341+
{:https, "mbta.com", 443, "/developers"}
342+
343+
iex> parse_url("http://localhost:8080/path?query=string#fragement")
344+
{:http, "localhost", 8080, "/path?query=string"}
345+
"""
346+
def parse_url(url) when is_binary(url) do
347+
uri = URI.parse(url)
348+
349+
scheme =
350+
case uri.scheme do
351+
"https" -> :https
352+
"http" -> :http
353+
nil -> :http
354+
end
355+
356+
path =
357+
if uri.query do
358+
"#{uri.path}?#{uri.query}"
359+
else
360+
uri.path
361+
end
362+
363+
{scheme, uri.host, uri.port, path}
364+
end
365+
366+
defp log_message(level, state, log_fn) do
367+
_ =
368+
Logger.log(level, fn ->
369+
"#{__MODULE__} #{log_fn.()} url=#{inspect(state.url)}"
370+
end)
371+
372+
:ok
373+
end
374+
end

mix.exs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ defmodule Concentrate.MixProject do
7171
{:gen_stage, "~> 1.0"},
7272
{:gpb, "~> 4.7", only: :dev, runtime: false, only: :dev},
7373
{:httpoison, "~> 1.0"},
74+
{:mint, "~> 1.0"},
75+
{:castore, "~> 0.1"},
7476
{:jason, "~> 1.0"},
7577
{:stream_data, "~> 0.4", only: :test},
7678
{:tzdata, "~> 1.1.1"}

0 commit comments

Comments
 (0)