Skip to content

Commit ea7e14f

Browse files
solnicclaude
andcommitted
feat(tests): support allowance: [Broadway] with metadata-tagged routing
Adds Broadway to the :allowance dispatch. The handler subscribes to [:broadway, :processor, :start] and [:broadway, :batch_processor, :start], reads the first :sentry_test_owner found in metadata.messages[*].metadata, and calls allow_sentry_reports/2 for that test pid. This follows the same shape Broadway documents for Ecto sandbox testing (metadata: %{ecto_sandbox: self()}) — no wrapper around Broadway.test_message/3, no hidden state, and async-safe by design because each message carries its origin test pid. Messages without :sentry_test_owner are silently skipped. Integration coverage adds a minimal Broadway pipeline (PhoenixApp.TestBroadway) and a 3-test broadway_test.exs in phoenix_app proving: (1) tagged + allowance captures, (2) untagged + allowance does not, (3) tagged + no allowance does not. async: true on the describe block to validate cross-test isolation through a shared pipeline. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b622a35 commit ea7e14f

6 files changed

Lines changed: 291 additions & 5 deletions

File tree

lib/sentry/test.ex

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ defmodule Sentry.Test do
5252

5353
@moduledoc since: "10.2.0"
5454

55-
@compile {:no_warn_undefined, [Bypass, Plug.Conn, :telemetry]}
55+
@compile {:no_warn_undefined, [Bypass, Plug.Conn, :telemetry, Broadway]}
5656

5757
@ownership_server Sentry.Test.OwnershipServer
5858

@@ -75,9 +75,9 @@ defmodule Sentry.Test do
7575
7676
## Options
7777
78-
* `:allowance` - a list of integration module atoms (currently `Oban`)
79-
to enable automatic `Sentry.Test.allow_sentry_reports/2` wiring for.
80-
See the "Oban tests" section below.
78+
* `:allowance` - a list of integration module atoms (currently `Oban`
79+
and `Broadway`) to enable automatic `Sentry.Test.allow_sentry_reports/2`
80+
wiring for. See the "Oban tests" and "Broadway tests" sections below.
8181
8282
Any other key is forwarded to the per-test Sentry config (e.g.,
8383
`dedup_events: false`, `traces_sample_rate: 1.0`).
@@ -125,6 +125,41 @@ defmodule Sentry.Test do
125125
jobs) are not auto-tagged and require manual
126126
`Sentry.Test.allow_sentry_reports/2`.
127127
128+
## Broadway tests
129+
130+
To route events from a Broadway processor or batch-processor back
131+
to your test, pass `:sentry_test_owner` in the message metadata
132+
when injecting messages via `Broadway.test_message/3` or
133+
`Broadway.test_batch/3`:
134+
135+
setup do
136+
Sentry.Test.setup_sentry(allowance: [Broadway])
137+
start_supervised!(MyPipeline)
138+
:ok
139+
end
140+
141+
test "captures events from the processor" do
142+
ref =
143+
Broadway.test_message(MyPipeline, payload,
144+
metadata: %{sentry_test_owner: self()}
145+
)
146+
147+
assert_receive {:ack, ^ref, [_succeeded], []}
148+
149+
assert [%Sentry.Event{}] = Sentry.Test.pop_sentry_reports()
150+
end
151+
152+
This mirrors the [Ecto sandbox pattern documented in
153+
Broadway](https://hexdocs.pm/broadway/Broadway.html#module-testing-with-ecto):
154+
the test owner travels with the message itself, so two `async: true`
155+
tests racing through the same pipeline are routed independently.
156+
157+
Messages submitted without the `:sentry_test_owner` metadata are not
158+
auto-allowed — the handler silently skips them. For production
159+
producers (Kafka, SQS, etc.) that need the same routing, attach the
160+
same key to the messages they emit; the handler reads it regardless
161+
of source.
162+
128163
## Examples
129164
130165
setup do
@@ -950,6 +985,17 @@ defmodule Sentry.Test do
950985
]
951986
end
952987

988+
defp allowance_handlers(Broadway) do
989+
# Both events fire once per worker invocation (per batch) in the
990+
# processor / batch-processor pid, with metadata.messages giving
991+
# the full batch. Reading the owner from message metadata is the
992+
# documented Broadway pattern (same shape as `ecto_sandbox`).
993+
[
994+
{[:broadway, :processor, :start], {__MODULE__, :__handle_broadway_batch_start__}},
995+
{[:broadway, :batch_processor, :start], {__MODULE__, :__handle_broadway_batch_start__}}
996+
]
997+
end
998+
953999
defp allowance_handlers(_other), do: :unknown
9541000

9551001
# ── Oban allowance handlers ──
@@ -998,6 +1044,33 @@ defmodule Sentry.Test do
9981044

9991045
def __handle_oban_job_finish__(_event, _measurements, _metadata, _config), do: :ok
10001046

1047+
# ── Broadway allowance handler ──
1048+
#
1049+
# The handler walks the batch's messages looking for a
1050+
# `:sentry_test_owner` metadata entry — the documented Broadway test
1051+
# pattern, identical in shape to the `:ecto_sandbox` example in the
1052+
# Broadway testing guide. Tests submit messages via
1053+
# `Broadway.test_message/3` with
1054+
# `metadata: %{sentry_test_owner: self()}` (or any custom producer
1055+
# that propagates `:metadata` onto `%Broadway.Message{}`).
1056+
@doc false
1057+
def __handle_broadway_batch_start__(_event, _measurements, %{messages: messages}, _config)
1058+
when is_list(messages) do
1059+
case find_broadway_owner(messages) do
1060+
nil -> :ok
1061+
owner_pid -> safe_allow(owner_pid, self())
1062+
end
1063+
end
1064+
1065+
def __handle_broadway_batch_start__(_event, _measurements, _metadata, _config), do: :ok
1066+
1067+
defp find_broadway_owner(messages) do
1068+
Enum.find_value(messages, fn
1069+
%{metadata: %{sentry_test_owner: pid}} when is_pid(pid) -> pid
1070+
_ -> nil
1071+
end)
1072+
end
1073+
10011074
# Best-effort allow used by the Oban / Broadway dispatch handlers.
10021075
# Swallows the `ArgumentError` that `allow_sentry_reports/2` raises
10031076
# when:

test/sentry/test_test.exs

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,134 @@ defmodule Sentry.TestTest do
553553
end
554554
end
555555

556+
describe "setup_sentry/1 with allowance: [Broadway] (synthetic events)" do
557+
setup do
558+
SentryTest.setup_sentry(allowance: [Broadway])
559+
end
560+
561+
test "processor batch start with a tagged message routes the worker" do
562+
test_pid = self()
563+
worker_done = make_ref()
564+
565+
worker =
566+
spawn(fn ->
567+
messages = [%{metadata: %{sentry_test_owner: test_pid}}]
568+
569+
:telemetry.execute(
570+
[:broadway, :processor, :start],
571+
%{},
572+
%{messages: messages}
573+
)
574+
575+
captured =
576+
case Sentry.capture_message("from broadway processor", result: :sync) do
577+
{:ok, _} -> :captured
578+
other -> {:unexpected, other}
579+
end
580+
581+
send(test_pid, {worker_done, captured})
582+
end)
583+
584+
ref = Process.monitor(worker)
585+
assert_receive {^worker_done, :captured}, 5_000
586+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
587+
588+
assert [%Sentry.Event{message: %{formatted: "from broadway processor"}}] =
589+
SentryTest.pop_sentry_reports()
590+
end
591+
592+
test "batch_processor start with a tagged message routes the worker" do
593+
test_pid = self()
594+
worker_done = make_ref()
595+
596+
worker =
597+
spawn(fn ->
598+
messages = [%{metadata: %{sentry_test_owner: test_pid}}]
599+
600+
:telemetry.execute(
601+
[:broadway, :batch_processor, :start],
602+
%{},
603+
%{messages: messages}
604+
)
605+
606+
captured =
607+
case Sentry.capture_message("from broadway batch", result: :sync) do
608+
{:ok, _} -> :captured
609+
other -> {:unexpected, other}
610+
end
611+
612+
send(test_pid, {worker_done, captured})
613+
end)
614+
615+
ref = Process.monitor(worker)
616+
assert_receive {^worker_done, :captured}, 5_000
617+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
618+
619+
assert [%Sentry.Event{message: %{formatted: "from broadway batch"}}] =
620+
SentryTest.pop_sentry_reports()
621+
end
622+
623+
test "messages without :sentry_test_owner metadata are not auto-allowed" do
624+
test_pid = self()
625+
worker_done = make_ref()
626+
627+
worker =
628+
spawn(fn ->
629+
messages = [%{metadata: %{some_other_key: :foo}}]
630+
631+
:telemetry.execute(
632+
[:broadway, :processor, :start],
633+
%{},
634+
%{messages: messages}
635+
)
636+
637+
captured =
638+
case Sentry.capture_message("untagged broadway", result: :sync) do
639+
{:ok, _} -> :captured
640+
other -> {:unexpected, other}
641+
end
642+
643+
send(test_pid, {worker_done, captured})
644+
end)
645+
646+
ref = Process.monitor(worker)
647+
assert_receive {^worker_done, :captured}, 5_000
648+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
649+
650+
assert [] == SentryTest.pop_sentry_reports()
651+
end
652+
653+
test "uses the first tagged message in a mixed batch" do
654+
test_pid = self()
655+
worker_done = make_ref()
656+
657+
worker =
658+
spawn(fn ->
659+
messages = [
660+
%{metadata: %{some_other_key: :foo}},
661+
%{metadata: %{sentry_test_owner: test_pid}},
662+
%{metadata: %{}}
663+
]
664+
665+
:telemetry.execute(
666+
[:broadway, :processor, :start],
667+
%{},
668+
%{messages: messages}
669+
)
670+
671+
Sentry.capture_message("mixed batch", result: :sync)
672+
send(test_pid, worker_done)
673+
end)
674+
675+
ref = Process.monitor(worker)
676+
assert_receive ^worker_done, 5_000
677+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
678+
679+
assert [%Sentry.Event{message: %{formatted: "mixed batch"}}] =
680+
SentryTest.pop_sentry_reports()
681+
end
682+
end
683+
556684
describe "before_send wrapping" do
557685
test "wraps existing before_send callback" do
558686
test_pid = self()

test_integrations/phoenix_app/mix.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ defmodule PhoenixApp.MixProject do
7575
{:opentelemetry_ecto, "~> 1.2"},
7676
{:opentelemetry_logger_metadata, "~> 0.2.0"},
7777
{:hackney, "~> 1.18"},
78-
{:oban, "~> 2.10"}
78+
{:oban, "~> 2.10"},
79+
{:broadway, "~> 1.0", only: [:test]}
7980
]
8081
end
8182

test_integrations/phoenix_app/mix.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
%{
22
"acceptor_pool": {:hex, :acceptor_pool, "1.0.1", "d88c2e8a0be9216cf513fbcd3e5a4beb36bee3ff4168e85d6152c6f899359cdb", [:rebar3], [], "hexpm", "f172f3d74513e8edd445c257d596fc84dbdd56d2c6fa287434269648ae5a421e"},
33
"bandit": {:hex, :bandit, "1.10.3", "1e5d168fa79ec8de2860d1b4d878d97d4fbbe2fdbe7b0a7d9315a4359d1d4bb9", [:mix], [{:hpax, "~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.18", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "99a52d909c48db65ca598e1962797659e3c0f1d06e825a50c3d75b74a5e2db18"},
4+
"broadway": {:hex, :broadway, "1.3.0", "f75f6376159b74f55c5ba2629dac613e4fd79d9e71148ab5fbac8fdd7c999d2a", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "bef3b4c5512d0072917b70239cbecf8f76a2587465a5b7c3e2b9ae18b4bc405b"},
45
"bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"},
56
"castore": {:hex, :castore, "1.0.17", "4f9770d2d45fbd91dcf6bd404cf64e7e58fed04fadda0923dc32acca0badffa2", [:mix], [], "hexpm", "12d24b9d80b910dd3953e165636d68f147a31db945d2dcb9365e441f8b5351e5"},
67
"cc_precompiler": {:hex, :cc_precompiler, "0.1.11", "8c844d0b9fb98a3edea067f94f616b3f6b29b959b6b3bf25fee94ffe34364768", [:mix], [{:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3427232caf0835f94680e5bcf082408a70b48ad68a5f5c0b02a3bea9f3a075b9"},
@@ -24,6 +25,7 @@
2425
"finch": {:hex, :finch, "0.21.0", "b1c3b2d48af02d0c66d2a9ebfb5622be5c5ecd62937cf79a88a7f98d48a8290c", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "87dc6e169794cb2570f75841a19da99cfde834249568f2a5b121b809588a4377"},
2526
"fine": {:hex, :fine, "0.1.4", "b19a89c1476c7c57afb5f9314aed5960b5bc95d5277de4cb5ee8e1d1616ce379", [:mix], [], "hexpm", "be3324cc454a42d80951cf6023b9954e9ff27c6daa255483b3e8d608670303f5"},
2627
"floki": {:hex, :floki, "0.38.0", "62b642386fa3f2f90713f6e231da0fa3256e41ef1089f83b6ceac7a3fd3abf33", [:mix], [], "hexpm", "a5943ee91e93fb2d635b612caf5508e36d37548e84928463ef9dd986f0d1abd9"},
28+
"gen_stage": {:hex, :gen_stage, "1.3.2", "7c77e5d1e97de2c6c2f78f306f463bca64bf2f4c3cdd606affc0100b89743b7b", [:mix], [], "hexpm", "0ffae547fa777b3ed889a6b9e1e64566217413d018cabd825f786e843ffe63e7"},
2729
"gettext": {:hex, :gettext, "0.26.2", "5978aa7b21fada6deabf1f6341ddba50bc69c999e812211903b169799208f2a8", [:mix], [{:expo, "~> 0.5.1 or ~> 1.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "aa978504bcf76511efdc22d580ba08e2279caab1066b76bb9aa81c4a1e0a32a5"},
2830
"gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"},
2931
"grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"},
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
defmodule PhoenixApp.BroadwayTest do
2+
# async: true — the auto-allowance design routes per-message via the
3+
# :sentry_test_owner metadata, which Broadway propagates onto the
4+
# %Broadway.Message{} struct. Two of these tests racing against each
5+
# other on the same shared pipeline still produce the right per-test
6+
# results because each message carries its origin test pid.
7+
use ExUnit.Case, async: true
8+
9+
describe "setup_sentry/1 with allowance: [Broadway]" do
10+
setup do
11+
Sentry.Test.setup_sentry(allowance: [Broadway])
12+
start_supervised!(PhoenixApp.TestBroadway)
13+
:ok
14+
end
15+
16+
test "events from a Broadway processor are captured when tagged via metadata" do
17+
ref =
18+
Broadway.test_message(PhoenixApp.TestBroadway, :capture,
19+
metadata: %{sentry_test_owner: self()}
20+
)
21+
22+
assert_receive {:ack, ^ref, [_succeeded], []}, 5_000
23+
24+
assert [%Sentry.Event{message: %{formatted: "from broadway"}}] =
25+
Sentry.Test.pop_sentry_reports()
26+
end
27+
28+
test "raw Broadway.test_message without :sentry_test_owner is not auto-allowed" do
29+
ref = Broadway.test_message(PhoenixApp.TestBroadway, :capture)
30+
31+
assert_receive {:ack, ^ref, [_succeeded], []}, 5_000
32+
assert [] == Sentry.Test.pop_sentry_reports()
33+
end
34+
end
35+
36+
describe "without allowance" do
37+
setup do
38+
Sentry.Test.setup_sentry()
39+
start_supervised!(PhoenixApp.TestBroadway)
40+
:ok
41+
end
42+
43+
test "tagged messages are still dropped without allowance: [Broadway]" do
44+
ref =
45+
Broadway.test_message(PhoenixApp.TestBroadway, :capture,
46+
metadata: %{sentry_test_owner: self()}
47+
)
48+
49+
assert_receive {:ack, ^ref, [_succeeded], []}, 5_000
50+
assert [] == Sentry.Test.pop_sentry_reports()
51+
end
52+
end
53+
end
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
defmodule PhoenixApp.TestBroadway do
2+
@moduledoc false
3+
4+
use Broadway
5+
6+
def start_link(_opts \\ []) do
7+
Broadway.start_link(__MODULE__,
8+
name: __MODULE__,
9+
producer: [module: {Broadway.DummyProducer, []}],
10+
processors: [default: [concurrency: 1]]
11+
)
12+
end
13+
14+
@impl true
15+
def handle_message(_processor, %Broadway.Message{data: data} = message, _context) do
16+
case data do
17+
:capture ->
18+
Sentry.capture_message("from broadway", result: :sync)
19+
20+
:raise ->
21+
raise "broadway boom"
22+
23+
_ ->
24+
:ok
25+
end
26+
27+
message
28+
end
29+
end

0 commit comments

Comments
 (0)