Skip to content

Commit 276e319

Browse files
committed
wip
1 parent 1963042 commit 276e319

3 files changed

Lines changed: 694 additions & 1 deletion

File tree

config/config.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,6 @@ config :concentrate,
5959
file_tap: [
6060
enabled?: false
6161
],
62-
http_producer: Concentrate.Producer.HTTPoison
62+
http_producer: Concentrate.Producer.Mint
6363

6464
import_config "#{Mix.env()}.exs"

lib/concentrate/producer/mint.ex

Lines changed: 375 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,375 @@
1+
defmodule Concentrate.Producer.Mint do
2+
@moduledoc """
3+
HTTP producer implementation using Mint.
4+
"""
5+
use GenStage
6+
require Logger
7+
alias Mint.HTTP
8+
@start_link_opts ~w(name)a
9+
@default_fetch_after 5_000
10+
@default_timeout 30_000
11+
@default_transport_opts [timeout: @default_timeout]
12+
@default_headers %{"accept-encoding" => "gzip"}
13+
14+
defmodule State do
15+
@moduledoc false
16+
17+
defstruct [
18+
:url,
19+
:parser,
20+
fetch_after: nil,
21+
opts: %{},
22+
transport_opts: [],
23+
headers: %{},
24+
conn: :not_connected,
25+
ref: :not_connected,
26+
demand: 0,
27+
response: nil,
28+
events: []
29+
]
30+
end
31+
32+
alias __MODULE__.State
33+
34+
def start_link({url, opts}) when is_binary(url) and is_list(opts) do
35+
start_link_opts = Keyword.take(opts, @start_link_opts)
36+
opts = Keyword.drop(opts, @start_link_opts)
37+
GenStage.start_link(__MODULE__, {url, opts}, start_link_opts)
38+
end
39+
40+
@impl GenStage
41+
def init({url, opts}) do
42+
opts = Map.new(opts)
43+
44+
parser =
45+
case Map.fetch!(opts, :parser) do
46+
module when is_atom(module) ->
47+
&module.parse(&1, [])
48+
49+
{module, opts} when is_atom(module) and is_list(opts) ->
50+
&module.parse(&1, opts)
51+
52+
fun when is_function(fun, 1) ->
53+
fun
54+
end
55+
56+
state = %State{
57+
url: url,
58+
parser: parser,
59+
fetch_after: Map.get(opts, :fetch_after, @default_fetch_after),
60+
transport_opts:
61+
Keyword.take(Map.get(opts, :get_opts, @default_transport_opts), ~w(timeout send_timeout)a),
62+
headers: Map.merge(@default_headers, Map.get(opts, :headers, %{})),
63+
opts: opts
64+
}
65+
66+
{
67+
:producer,
68+
state,
69+
dispatcher: GenStage.BroadcastDispatcher
70+
}
71+
end
72+
73+
@impl GenStage
74+
def handle_demand(new_demand, %{demand: existing_demand} = state) do
75+
demand = new_demand + existing_demand
76+
state = %{state | demand: demand}
77+
78+
state =
79+
if existing_demand == 0 do
80+
make_request(state, state.url)
81+
else
82+
state
83+
end
84+
85+
{:noreply, [], state}
86+
end
87+
88+
@impl GenStage
89+
def handle_info({:fetch, url}, state) do
90+
state =
91+
if state.demand > 0 do
92+
make_request(state, url)
93+
else
94+
state
95+
end
96+
97+
{:noreply, [], state}
98+
end
99+
100+
def handle_info({:fetch_timeout, ref}, %{ref: ref} = state) do
101+
log_message(:warn, state, fn -> "fetch timed out, disconnecting" end)
102+
103+
_ =
104+
if state.conn != :not_connected and HTTP.open?(state.conn) do
105+
HTTP.close(state.conn)
106+
end
107+
108+
state = %{state | conn: :not_connected, ref: nil}
109+
state = fetch_again!(state)
110+
{:noreply, [], state}
111+
end
112+
113+
def handle_info({:fetch_timeout, _}, state) do
114+
{:noreply, [], state}
115+
end
116+
117+
def handle_info(message, state) do
118+
state =
119+
case HTTP.stream(state.conn, message) do
120+
{:ok, conn, responses} ->
121+
Enum.reduce(responses, %{state | conn: conn}, &handle_responses/2)
122+
123+
{:error, conn, error, _responses} ->
124+
log_message(:warn, state, fn ->
125+
"HTTP error error=#{inspect(error)}
126+
"
127+
end)
128+
129+
state = %{state | conn: conn, ref: nil}
130+
fetch_again!(state)
131+
132+
:unknown ->
133+
log_message(:warn, state, fn -> "unknown message message=#{inspect(message)}" end)
134+
state
135+
end
136+
137+
{:noreply, Enum.reverse(state.events), %{state | events: []}}
138+
end
139+
140+
def handle_responses({:status, ref, status}, %{ref: ref} = state) do
141+
%{state | response: {status, [], []}}
142+
end
143+
144+
def handle_responses({:headers, ref, headers}, %{ref: ref} = state) do
145+
{status, old_headers, body} = state.response
146+
147+
cache_headers =
148+
Enum.reduce(headers, state.headers, fn {header, value}, acc ->
149+
case String.downcase(header) do
150+
"last-modified" ->
151+
Map.put(acc, "if-modified-since", value)
152+
153+
"etag" ->
154+
Map.put(acc, "if-none-match", value)
155+
156+
_ ->
157+
acc
158+
end
159+
end)
160+
161+
# don't use if-none-match if we already have if-modified-since
162+
cache_headers =
163+
case cache_headers do
164+
%{"if-modified-since" => _, "if-none-match" => _} ->
165+
Map.delete(cache_headers, "if-none-match")
166+
167+
_ ->
168+
cache_headers
169+
end
170+
171+
%{state | response: {status, headers ++ old_headers, body}, headers: cache_headers}
172+
end
173+
174+
def handle_responses({:data, ref, data}, %{ref: ref} = state) do
175+
{status, headers, body} = state.response
176+
177+
%{state | response: {status, headers, [body | data]}}
178+
end
179+
180+
def handle_responses({:done, ref}, %{ref: ref} = state) do
181+
{status, headers, body} = state.response
182+
handle_http_response(state, status, headers, body)
183+
end
184+
185+
def handle_responses(response, state) do
186+
log_message(:warn, state, fn ->
187+
"unexpected response=#{inspect(response)}"
188+
end)
189+
190+
state
191+
end
192+
193+
def make_request(state, url) do
194+
state = connect(state, url)
195+
196+
if state.conn != :not_connected do
197+
{_, _, _, path} = parse_url(url)
198+
{:ok, conn, ref} = HTTP.request(state.conn, "GET", path, Map.to_list(state.headers))
199+
200+
Process.send_after(
201+
self(),
202+
{:fetch_timeout, ref},
203+
Keyword.get(state.transport_opts, :timeout)
204+
)
205+
206+
%{state | conn: conn, ref: ref}
207+
else
208+
fetch_again!(state)
209+
end
210+
end
211+
212+
defp connect(%{conn: :not_connected} = state, url) do
213+
{scheme, host, port, _} = parse_url(url)
214+
215+
case HTTP.connect(scheme, host, port,
216+
transport_opts: state.transport_opts,
217+
protocols: [:http1]
218+
) do
219+
{:ok, conn} ->
220+
%{state | conn: conn}
221+
222+
{:error, _} ->
223+
state
224+
end
225+
end
226+
227+
defp connect(state, url) do
228+
# if HTTP.open?(conn) do
229+
# state
230+
# else
231+
state
232+
|> disconnect()
233+
|> connect(url)
234+
235+
# end
236+
end
237+
238+
defp disconnect(%{conn: conn} = state) do
239+
_ = HTTP.close(conn)
240+
%{state | conn: :not_connected, ref: :not_connected}
241+
end
242+
243+
defp handle_http_response(state, 200, headers, body) do
244+
body = decode_body(body, find_header(headers, "content-encoding"))
245+
{time, parsed} = :timer.tc(state.parser, [body])
246+
247+
log_message(:info, state, fn ->
248+
"updated: records=#{length(parsed)} time=#{time / 1000}"
249+
end)
250+
251+
state = %{
252+
state
253+
| events: [parsed | state.events],
254+
demand: max(state.demand - 1, 0),
255+
response: nil
256+
}
257+
258+
fetch_again!(state)
259+
rescue
260+
error ->
261+
state = log_parse_error(error, state, System.stacktrace())
262+
fetch_again!(state)
263+
catch
264+
error ->
265+
state = log_parse_error(error, state, System.stacktrace())
266+
fetch_again!(state)
267+
end
268+
269+
defp handle_http_response(state, redirect, headers, _body) when redirect in [301, 302] do
270+
{:ok, location} = find_header(headers, "location")
271+
state = disconnect(state)
272+
273+
if redirect == 301 do
274+
state = %{state | url: location}
275+
fetch_again!(state, fetch_after: 0)
276+
else
277+
fetch_again!(state, url: location, fetch_after: 0)
278+
end
279+
end
280+
281+
defp handle_http_response(state, 304, _headers, _body) do
282+
log_message(:info, state, fn -> "not modified status=304" end)
283+
fetch_again!(state)
284+
end
285+
286+
defp handle_http_response(state, 404, _headers, _body) do
287+
log_message(:warn, state, fn -> "not found status=404" end)
288+
fetch_again!(state)
289+
end
290+
291+
defp decode_body(body, {:ok, "gzip"}) do
292+
:zlib.gunzip(body)
293+
end
294+
295+
defp decode_body(body, _) do
296+
IO.iodata_to_binary(body)
297+
end
298+
299+
defp fetch_again!(state, opts \\ []) do
300+
_ =
301+
if state.demand > 0 do
302+
url = Keyword.get(opts, :url, state.url)
303+
fetch_after = Keyword.get(opts, :fetch_after, state.fetch_after)
304+
Process.send_after(self(), {:fetch, url}, fetch_after)
305+
end
306+
307+
state
308+
end
309+
310+
def find_header(headers, query) do
311+
value =
312+
Enum.find_value(headers, fn {header, value} ->
313+
if String.downcase(header) == query do
314+
value
315+
else
316+
nil
317+
end
318+
end)
319+
320+
if value do
321+
{:ok, value}
322+
else
323+
:error
324+
end
325+
end
326+
327+
defp log_parse_error(error, machine, trace) do
328+
_ =
329+
Logger.error(fn ->
330+
"#{__MODULE__}: parse error url=#{machine.url} error=#{inspect(error)}\n#{
331+
Exception.format_stacktrace(trace)
332+
}"
333+
end)
334+
335+
machine
336+
end
337+
338+
@doc """
339+
Parse URL into the pieces needed for connecting to Mint.
340+
341+
iex> parse_url("https://mbta.com/developers")
342+
{:https, "mbta.com", 443, "/developers"}
343+
344+
iex> parse_url("http://localhost:8080/path?query=string#fragement")
345+
{:http, "localhost", 8080, "/path?query=string"}
346+
"""
347+
def parse_url(url) when is_binary(url) do
348+
uri = URI.parse(url)
349+
350+
scheme =
351+
case uri.scheme do
352+
"https" -> :https
353+
"http" -> :http
354+
nil -> :http
355+
end
356+
357+
path =
358+
if uri.query do
359+
"#{uri.path}?#{uri.query}"
360+
else
361+
uri.path
362+
end
363+
364+
{scheme, uri.host, uri.port, path}
365+
end
366+
367+
defp log_message(level, state, log_fn) do
368+
_ =
369+
Logger.log(level, fn ->
370+
"#{__MODULE__} #{log_fn.()} url=#{inspect(state.url)}"
371+
end)
372+
373+
:ok
374+
end
375+
end

0 commit comments

Comments
 (0)