Skip to content

Commit 966c59a

Browse files
authored
Add :telemetry.span for handle_failed/2 callback (#369)
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 370f180 commit 966c59a

3 files changed

Lines changed: 218 additions & 24 deletions

File tree

lib/broadway.ex

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,57 @@ defmodule Broadway do
795795
}
796796
```
797797
798+
* `[:broadway, :handle_failed, :start]` - Dispatched before the `c:handle_failed/2`
799+
callback is invoked
800+
801+
* Measurement: `%{system_time: integer}`
802+
803+
* Metadata:
804+
805+
```
806+
%{
807+
module: atom,
808+
messages: [Broadway.Message.t],
809+
context: term,
810+
telemetry_span_context: reference
811+
}
812+
```
813+
814+
* `[:broadway, :handle_failed, :stop]` - Dispatched after the `c:handle_failed/2`
815+
callback has returned
816+
817+
* Measurement: `%{duration: native_time}`
818+
819+
* Metadata:
820+
821+
```
822+
%{
823+
module: atom,
824+
messages: [Broadway.Message.t],
825+
context: term,
826+
telemetry_span_context: reference
827+
}
828+
```
829+
830+
* `[:broadway, :handle_failed, :exception]` - Dispatched if the `c:handle_failed/2`
831+
callback raises an exception
832+
833+
* Measurement: `%{duration: native_time}`
834+
835+
* Metadata:
836+
837+
```
838+
%{
839+
module: atom,
840+
messages: [Broadway.Message.t],
841+
context: term,
842+
kind: :error | :exit | :throw,
843+
reason: term,
844+
stacktrace: list,
845+
telemetry_span_context: reference
846+
}
847+
```
848+
798849
* `[:broadway, :batcher, :start]` - Dispatched by a Broadway batcher before
799850
handling events
800851

lib/broadway/acknowledger.ex

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -102,35 +102,50 @@ defmodule Broadway.Acknowledger do
102102
end
103103

104104
defp handle_failed_messages(messages, module, context) do
105-
module.handle_failed(messages, context)
106-
catch
107-
kind, reason ->
108-
Logger.error(Exception.format(kind, reason, __STACKTRACE__),
109-
crash_reason: crash_reason(kind, reason, __STACKTRACE__)
105+
metadata = %{module: module, messages: messages, context: context}
106+
107+
try do
108+
:telemetry.span(
109+
[:broadway, :handle_failed],
110+
metadata,
111+
fn ->
112+
result = do_handle_failed_messages(messages, module, context)
113+
{result, Map.put(metadata, :messages, result)}
114+
end
110115
)
116+
catch
117+
kind, reason ->
118+
Logger.error(Exception.format(kind, reason, __STACKTRACE__),
119+
crash_reason: crash_reason(kind, reason, __STACKTRACE__)
120+
)
111121

112-
messages
113-
else
114-
return_messages when is_list(return_messages) ->
115-
size = length(messages)
116-
return_size = length(return_messages)
122+
messages
123+
end
124+
end
117125

118-
if return_size != size do
119-
Logger.error(
120-
"#{inspect(module)}.handle_failed/2 received #{size} messages and " <>
121-
"returned only #{return_size}. All messages given to handle_failed/2 " <>
122-
"must be returned"
123-
)
124-
end
126+
defp do_handle_failed_messages(messages, module, context) do
127+
case module.handle_failed(messages, context) do
128+
return_messages when is_list(return_messages) ->
129+
size = length(messages)
130+
return_size = length(return_messages)
125131

126-
return_messages
132+
if return_size != size do
133+
Logger.error(
134+
"#{inspect(module)}.handle_failed/2 received #{size} messages and " <>
135+
"returned only #{return_size}. All messages given to handle_failed/2 " <>
136+
"must be returned"
137+
)
138+
end
127139

128-
_other ->
129-
Logger.error(
130-
"#{inspect(module)}.handle_failed/2 didn't return a list of messages, " <>
131-
"so ignoring its return value"
132-
)
140+
return_messages
133141

134-
messages
142+
_other ->
143+
Logger.error(
144+
"#{inspect(module)}.handle_failed/2 didn't return a list of messages, " <>
145+
"so ignoring its return value"
146+
)
147+
148+
messages
149+
end
135150
end
136151
end

test/broadway_test.exs

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1939,6 +1939,134 @@ defmodule BroadwayTest do
19391939
end
19401940
end
19411941

1942+
describe "handle_failed telemetry" do
1943+
test "emits [:broadway, :handle_failed] span events from a processor" do
1944+
broadway_name = new_unique_name()
1945+
test_pid = self()
1946+
1947+
:telemetry.attach_many(
1948+
"#{broadway_name}-handle-failed",
1949+
[[:broadway, :handle_failed, :start], [:broadway, :handle_failed, :stop]],
1950+
fn event, _measurements, metadata, _config ->
1951+
send(test_pid, {:telemetry_event, event, metadata})
1952+
end,
1953+
%{}
1954+
)
1955+
1956+
{:ok, _broadway} =
1957+
Broadway.start_link(CustomHandlersWithHandleFailed,
1958+
name: broadway_name,
1959+
context: %{
1960+
handle_message: fn message, _ -> Message.failed(message, :failed) end,
1961+
handle_failed: fn messages, _ ->
1962+
send(test_pid, :handle_failed_called)
1963+
messages
1964+
end
1965+
},
1966+
producer: [module: {ManualProducer, []}],
1967+
processors: [default: []]
1968+
)
1969+
1970+
ref = Broadway.test_batch(broadway_name, [:fail], batch_mode: :flush)
1971+
1972+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :start], start_metadata}
1973+
assert start_metadata.module == CustomHandlersWithHandleFailed
1974+
assert [%Message{}] = start_metadata.messages
1975+
1976+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :stop], stop_metadata}
1977+
assert stop_metadata.module == CustomHandlersWithHandleFailed
1978+
assert [%Message{}] = stop_metadata.messages
1979+
1980+
assert_receive :handle_failed_called
1981+
assert_receive {:ack, ^ref, _successful, _failed}
1982+
1983+
:telemetry.detach("#{broadway_name}-handle-failed")
1984+
end
1985+
1986+
test "emits [:broadway, :handle_failed] span events from a batch processor" do
1987+
broadway_name = new_unique_name()
1988+
test_pid = self()
1989+
1990+
:telemetry.attach_many(
1991+
"#{broadway_name}-handle-failed",
1992+
[[:broadway, :handle_failed, :start], [:broadway, :handle_failed, :stop]],
1993+
fn event, _measurements, metadata, _config ->
1994+
send(test_pid, {:telemetry_event, event, metadata})
1995+
end,
1996+
%{}
1997+
)
1998+
1999+
{:ok, _broadway} =
2000+
Broadway.start_link(CustomHandlersWithHandleFailed,
2001+
name: broadway_name,
2002+
context: %{
2003+
handle_message: fn message, _ -> message end,
2004+
handle_batch: fn _, messages, _, _ ->
2005+
Enum.map(messages, &Message.failed(&1, :failed))
2006+
end,
2007+
handle_failed: fn messages, _ ->
2008+
send(test_pid, :handle_failed_called)
2009+
messages
2010+
end
2011+
},
2012+
producer: [module: {ManualProducer, []}],
2013+
processors: [default: []],
2014+
batchers: [default: []]
2015+
)
2016+
2017+
ref = Broadway.test_batch(broadway_name, [:fail], batch_mode: :flush)
2018+
2019+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :start], start_metadata}
2020+
assert start_metadata.module == CustomHandlersWithHandleFailed
2021+
assert [%Message{}] = start_metadata.messages
2022+
2023+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :stop], stop_metadata}
2024+
assert stop_metadata.module == CustomHandlersWithHandleFailed
2025+
assert [%Message{}] = stop_metadata.messages
2026+
2027+
assert_receive :handle_failed_called
2028+
assert_receive {:ack, ^ref, _successful, _failed}
2029+
2030+
:telemetry.detach("#{broadway_name}-handle-failed")
2031+
end
2032+
2033+
test "emits [:broadway, :handle_failed, :exception] when handle_failed/2 raises" do
2034+
broadway_name = new_unique_name()
2035+
test_pid = self()
2036+
2037+
:telemetry.attach_many(
2038+
"#{broadway_name}-handle-failed",
2039+
[[:broadway, :handle_failed, :start], [:broadway, :handle_failed, :exception]],
2040+
fn event, _measurements, metadata, _config ->
2041+
send(test_pid, {:telemetry_event, event, metadata})
2042+
end,
2043+
%{}
2044+
)
2045+
2046+
{:ok, _broadway} =
2047+
Broadway.start_link(CustomHandlersWithHandleFailed,
2048+
name: broadway_name,
2049+
context: %{
2050+
handle_message: fn message, _ -> Message.failed(message, :failed) end,
2051+
handle_failed: fn _messages, _ -> raise "oops" end
2052+
},
2053+
producer: [module: {ManualProducer, []}],
2054+
processors: [default: []]
2055+
)
2056+
2057+
capture_log(fn ->
2058+
ref = Broadway.test_batch(broadway_name, [:fail], batch_mode: :flush)
2059+
assert_receive {:ack, ^ref, _successful, _failed}
2060+
end)
2061+
2062+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :start], _}
2063+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :exception], metadata}
2064+
assert %{kind: :error, reason: %RuntimeError{message: "oops"}} = metadata
2065+
2066+
:telemetry.detach("#{broadway_name}-handle-failed")
2067+
end
2068+
end
2069+
19422070
describe "handle producer crash" do
19432071
setup do
19442072
test_pid = self()

0 commit comments

Comments
 (0)