Skip to content

Commit c22a417

Browse files
committed
feat(telemetry): add :telemetry.span for handle_failed/2 callback
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 370f180 commit c22a417

2 files changed

Lines changed: 105 additions & 0 deletions

File tree

lib/broadway/acknowledger.ex

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,19 @@ defmodule Broadway.Acknowledger do
102102
end
103103

104104
defp handle_failed_messages(messages, module, context) do
105+
metadata = %{module: module, messages: messages, context: context}
106+
107+
:telemetry.span(
108+
[:broadway, :handle_failed],
109+
metadata,
110+
fn ->
111+
result = do_handle_failed_messages(messages, module, context)
112+
{result, Map.put(metadata, :messages, result)}
113+
end
114+
)
115+
end
116+
117+
defp do_handle_failed_messages(messages, module, context) do
105118
module.handle_failed(messages, context)
106119
catch
107120
kind, reason ->

test/broadway_test.exs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1939,6 +1939,98 @@ defmodule BroadwayTest do
19391939
end
19401940
end
19411941

1942+
describe "handle_failed telemetry" do
1943+
test "emits [:broadway, :handle_failed] span events from a processor" do
1944+
broadway_name = new_unique_name()
1945+
test_pid = self()
1946+
1947+
:telemetry.attach_many(
1948+
"#{broadway_name}-handle-failed",
1949+
[[:broadway, :handle_failed, :start], [:broadway, :handle_failed, :stop]],
1950+
fn event, _measurements, metadata, _config ->
1951+
send(test_pid, {:telemetry_event, event, metadata})
1952+
end,
1953+
%{}
1954+
)
1955+
1956+
{:ok, _broadway} =
1957+
Broadway.start_link(CustomHandlersWithHandleFailed,
1958+
name: broadway_name,
1959+
context: %{
1960+
handle_message: fn message, _ -> Message.failed(message, :failed) end,
1961+
handle_failed: fn messages, _ ->
1962+
send(test_pid, :handle_failed_called)
1963+
messages
1964+
end
1965+
},
1966+
producer: [module: {ManualProducer, []}],
1967+
processors: [default: []]
1968+
)
1969+
1970+
ref = Broadway.test_batch(broadway_name, [:fail], batch_mode: :flush)
1971+
1972+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :start], start_metadata}
1973+
assert start_metadata.module == CustomHandlersWithHandleFailed
1974+
assert [%Message{}] = start_metadata.messages
1975+
1976+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :stop], stop_metadata}
1977+
assert stop_metadata.module == CustomHandlersWithHandleFailed
1978+
assert [%Message{}] = stop_metadata.messages
1979+
1980+
assert_receive :handle_failed_called
1981+
assert_receive {:ack, ^ref, _successful, _failed}
1982+
1983+
:telemetry.detach("#{broadway_name}-handle-failed")
1984+
end
1985+
1986+
test "emits [:broadway, :handle_failed] span events from a batch processor" do
1987+
broadway_name = new_unique_name()
1988+
test_pid = self()
1989+
1990+
:telemetry.attach_many(
1991+
"#{broadway_name}-handle-failed",
1992+
[[:broadway, :handle_failed, :start], [:broadway, :handle_failed, :stop]],
1993+
fn event, _measurements, metadata, _config ->
1994+
send(test_pid, {:telemetry_event, event, metadata})
1995+
end,
1996+
%{}
1997+
)
1998+
1999+
{:ok, _broadway} =
2000+
Broadway.start_link(CustomHandlersWithHandleFailed,
2001+
name: broadway_name,
2002+
context: %{
2003+
handle_message: fn message, _ -> message end,
2004+
handle_batch: fn _, messages, _, _ ->
2005+
Enum.map(messages, &Message.failed(&1, :failed))
2006+
end,
2007+
handle_failed: fn messages, _ ->
2008+
send(test_pid, :handle_failed_called)
2009+
messages
2010+
end
2011+
},
2012+
producer: [module: {ManualProducer, []}],
2013+
processors: [default: []],
2014+
batchers: [default: []]
2015+
)
2016+
2017+
ref = Broadway.test_batch(broadway_name, [:fail], batch_mode: :flush)
2018+
2019+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :start], start_metadata}
2020+
assert start_metadata.module == CustomHandlersWithHandleFailed
2021+
assert [%Message{}] = start_metadata.messages
2022+
2023+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :stop], stop_metadata}
2024+
assert stop_metadata.module == CustomHandlersWithHandleFailed
2025+
assert [%Message{}] = stop_metadata.messages
2026+
2027+
assert_receive :handle_failed_called
2028+
assert_receive {:ack, ^ref, _successful, _failed}
2029+
2030+
:telemetry.detach("#{broadway_name}-handle-failed")
2031+
end
2032+
end
2033+
19422034
describe "handle producer crash" do
19432035
setup do
19442036
test_pid = self()

0 commit comments

Comments
 (0)