Skip to content

Commit f76f8e5

Browse files
committed
feat(telemetry): add :telemetry.span for handle_failed/2 callback
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 370f180 commit f76f8e5

3 files changed

Lines changed: 158 additions & 4 deletions

File tree

lib/broadway.ex

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -673,8 +673,7 @@ defmodule Broadway do
673673
processor_key: atom,
674674
index: non_neg_integer,
675675
messages: [Broadway.Message.t],
676-
telemetry_span_context: reference,
677-
producer: {atom, list}
676+
telemetry_span_context: reference
678677
}
679678
```
680679
@@ -695,8 +694,7 @@ defmodule Broadway do
695694
successful_messages_to_ack: [Broadway.Message.t],
696695
successful_messages_to_forward: [Broadway.Message.t],
697696
failed_messages: [Broadway.Message.t],
698-
telemetry_span_context: reference,
699-
producer: {atom, list}
697+
telemetry_span_context: reference
700698
}
701699
```
702700
@@ -795,6 +793,57 @@ defmodule Broadway do
795793
}
796794
```
797795
796+
* `[:broadway, :handle_failed, :start]` - Dispatched before the `c:handle_failed/2`
797+
callback is invoked
798+
799+
* Measurement: `%{system_time: integer}`
800+
801+
* Metadata:
802+
803+
```
804+
%{
805+
module: atom,
806+
messages: [Broadway.Message.t],
807+
context: term,
808+
telemetry_span_context: reference
809+
}
810+
```
811+
812+
* `[:broadway, :handle_failed, :stop]` - Dispatched after the `c:handle_failed/2`
813+
callback has returned
814+
815+
* Measurement: `%{duration: native_time}`
816+
817+
* Metadata:
818+
819+
```
820+
%{
821+
module: atom,
822+
messages: [Broadway.Message.t],
823+
context: term,
824+
telemetry_span_context: reference
825+
}
826+
```
827+
828+
* `[:broadway, :handle_failed, :exception]` - Dispatched if the `c:handle_failed/2`
829+
callback raises an exception
830+
831+
* Measurement: `%{duration: native_time}`
832+
833+
* Metadata:
834+
835+
```
836+
%{
837+
module: atom,
838+
messages: [Broadway.Message.t],
839+
context: term,
840+
kind: :error | :exit | :throw,
841+
reason: term,
842+
stacktrace: list,
843+
telemetry_span_context: reference
844+
}
845+
```
846+
798847
* `[:broadway, :batcher, :start]` - Dispatched by a Broadway batcher before
799848
handling events
800849

lib/broadway/acknowledger.ex

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,19 @@ defmodule Broadway.Acknowledger do
102102
end
103103

104104
defp handle_failed_messages(messages, module, context) do
105+
metadata = %{module: module, messages: messages, context: context}
106+
107+
:telemetry.span(
108+
[:broadway, :handle_failed],
109+
metadata,
110+
fn ->
111+
result = do_handle_failed_messages(messages, module, context)
112+
{result, Map.put(metadata, :messages, result)}
113+
end
114+
)
115+
end
116+
117+
defp do_handle_failed_messages(messages, module, context) do
105118
module.handle_failed(messages, context)
106119
catch
107120
kind, reason ->

test/broadway_test.exs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1939,6 +1939,98 @@ defmodule BroadwayTest do
19391939
end
19401940
end
19411941

1942+
describe "handle_failed telemetry" do
1943+
test "emits [:broadway, :handle_failed] span events from a processor" do
1944+
broadway_name = new_unique_name()
1945+
test_pid = self()
1946+
1947+
:telemetry.attach_many(
1948+
"#{broadway_name}-handle-failed",
1949+
[[:broadway, :handle_failed, :start], [:broadway, :handle_failed, :stop]],
1950+
fn event, _measurements, metadata, _config ->
1951+
send(test_pid, {:telemetry_event, event, metadata})
1952+
end,
1953+
%{}
1954+
)
1955+
1956+
{:ok, _broadway} =
1957+
Broadway.start_link(CustomHandlersWithHandleFailed,
1958+
name: broadway_name,
1959+
context: %{
1960+
handle_message: fn message, _ -> Message.failed(message, :failed) end,
1961+
handle_failed: fn messages, _ ->
1962+
send(test_pid, :handle_failed_called)
1963+
messages
1964+
end
1965+
},
1966+
producer: [module: {ManualProducer, []}],
1967+
processors: [default: []]
1968+
)
1969+
1970+
ref = Broadway.test_batch(broadway_name, [:fail], batch_mode: :flush)
1971+
1972+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :start], start_metadata}
1973+
assert start_metadata.module == CustomHandlersWithHandleFailed
1974+
assert [%Message{}] = start_metadata.messages
1975+
1976+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :stop], stop_metadata}
1977+
assert stop_metadata.module == CustomHandlersWithHandleFailed
1978+
assert [%Message{}] = stop_metadata.messages
1979+
1980+
assert_receive :handle_failed_called
1981+
assert_receive {:ack, ^ref, _successful, _failed}
1982+
1983+
:telemetry.detach("#{broadway_name}-handle-failed")
1984+
end
1985+
1986+
test "emits [:broadway, :handle_failed] span events from a batch processor" do
1987+
broadway_name = new_unique_name()
1988+
test_pid = self()
1989+
1990+
:telemetry.attach_many(
1991+
"#{broadway_name}-handle-failed",
1992+
[[:broadway, :handle_failed, :start], [:broadway, :handle_failed, :stop]],
1993+
fn event, _measurements, metadata, _config ->
1994+
send(test_pid, {:telemetry_event, event, metadata})
1995+
end,
1996+
%{}
1997+
)
1998+
1999+
{:ok, _broadway} =
2000+
Broadway.start_link(CustomHandlersWithHandleFailed,
2001+
name: broadway_name,
2002+
context: %{
2003+
handle_message: fn message, _ -> message end,
2004+
handle_batch: fn _, messages, _, _ ->
2005+
Enum.map(messages, &Message.failed(&1, :failed))
2006+
end,
2007+
handle_failed: fn messages, _ ->
2008+
send(test_pid, :handle_failed_called)
2009+
messages
2010+
end
2011+
},
2012+
producer: [module: {ManualProducer, []}],
2013+
processors: [default: []],
2014+
batchers: [default: []]
2015+
)
2016+
2017+
ref = Broadway.test_batch(broadway_name, [:fail], batch_mode: :flush)
2018+
2019+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :start], start_metadata}
2020+
assert start_metadata.module == CustomHandlersWithHandleFailed
2021+
assert [%Message{}] = start_metadata.messages
2022+
2023+
assert_receive {:telemetry_event, [:broadway, :handle_failed, :stop], stop_metadata}
2024+
assert stop_metadata.module == CustomHandlersWithHandleFailed
2025+
assert [%Message{}] = stop_metadata.messages
2026+
2027+
assert_receive :handle_failed_called
2028+
assert_receive {:ack, ^ref, _successful, _failed}
2029+
2030+
:telemetry.detach("#{broadway_name}-handle-failed")
2031+
end
2032+
end
2033+
19422034
describe "handle producer crash" do
19432035
setup do
19442036
test_pid = self()

0 commit comments

Comments
 (0)