Skip to content

Commit ee71a75

Browse files
solnicclaude
andcommitted
feat(tests): support allowance: [Broadway] with metadata-tagged routing
Adds Broadway to the :allowance dispatch. The handler subscribes to [:broadway, :processor, :start] and [:broadway, :batch_processor, :start], reads the first :sentry_test_owner found in metadata.messages[*].metadata, and calls allow_sentry_reports/2 for that test pid. This follows the same shape Broadway documents for Ecto sandbox testing (metadata: %{ecto_sandbox: self()}) — no wrapper around Broadway.test_message/3, no hidden state, and async-safe by design because each message carries its origin test pid. Messages without :sentry_test_owner are silently skipped. Integration coverage adds a minimal Broadway pipeline (PhoenixApp.TestBroadway) and a 3-test broadway_test.exs in phoenix_app proving: (1) tagged + allowance captures, (2) untagged + allowance does not, (3) tagged + no allowance does not. async: true on the describe block to validate cross-test isolation through a shared pipeline. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d1281d1 commit ee71a75

6 files changed

Lines changed: 291 additions & 5 deletions

File tree

lib/sentry/test.ex

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ defmodule Sentry.Test do
5353

5454
@moduledoc since: "10.2.0"
5555

56-
@compile {:no_warn_undefined, [Bypass, Plug.Conn, NimbleOwnership, :telemetry]}
56+
@compile {:no_warn_undefined, [Bypass, Plug.Conn, NimbleOwnership, :telemetry, Broadway]}
5757

5858
@ownership_server Sentry.Test.OwnershipServer
5959

@@ -76,9 +76,9 @@ defmodule Sentry.Test do
7676
7777
## Options
7878
79-
* `:allowance` - a list of integration module atoms (currently `Oban`)
80-
to enable automatic `Sentry.Test.allow_sentry_reports/2` wiring for.
81-
See the "Oban tests" section below.
79+
* `:allowance` - a list of integration module atoms (currently `Oban`
80+
and `Broadway`) to enable automatic `Sentry.Test.allow_sentry_reports/2`
81+
wiring for. See the "Oban tests" and "Broadway tests" sections below.
8282
8383
Any other key is forwarded to the per-test Sentry config (e.g.,
8484
`dedup_events: false`, `traces_sample_rate: 1.0`).
@@ -126,6 +126,41 @@ defmodule Sentry.Test do
126126
jobs) are not auto-tagged and require manual
127127
`Sentry.Test.allow_sentry_reports/2`.
128128
129+
## Broadway tests
130+
131+
To route events from a Broadway processor or batch-processor back
132+
to your test, pass `:sentry_test_owner` in the message metadata
133+
when injecting messages via `Broadway.test_message/3` or
134+
`Broadway.test_batch/3`:
135+
136+
setup do
137+
Sentry.Test.setup_sentry(allowance: [Broadway])
138+
start_supervised!(MyPipeline)
139+
:ok
140+
end
141+
142+
test "captures events from the processor" do
143+
ref =
144+
Broadway.test_message(MyPipeline, payload,
145+
metadata: %{sentry_test_owner: self()}
146+
)
147+
148+
assert_receive {:ack, ^ref, [_succeeded], []}
149+
150+
assert [%Sentry.Event{}] = Sentry.Test.pop_sentry_reports()
151+
end
152+
153+
This mirrors the [Ecto sandbox pattern documented in
154+
Broadway](https://hexdocs.pm/broadway/Broadway.html#module-testing-with-ecto):
155+
the test owner travels with the message itself, so two `async: true`
156+
tests racing through the same pipeline are routed independently.
157+
158+
Messages submitted without the `:sentry_test_owner` metadata are not
159+
auto-allowed — the handler silently skips them. For production
160+
producers (Kafka, SQS, etc.) that need the same routing, attach the
161+
same key to the messages they emit; the handler reads it regardless
162+
of source.
163+
129164
## Examples
130165
131166
setup do
@@ -965,6 +1000,17 @@ defmodule Sentry.Test do
9651000
]
9661001
end
9671002

1003+
defp allowance_handlers(Broadway) do
1004+
# Both events fire once per worker invocation (per batch) in the
1005+
# processor / batch-processor pid, with metadata.messages giving
1006+
# the full batch. Reading the owner from message metadata is the
1007+
# documented Broadway pattern (same shape as `ecto_sandbox`).
1008+
[
1009+
{[:broadway, :processor, :start], {__MODULE__, :__handle_broadway_batch_start__}},
1010+
{[:broadway, :batch_processor, :start], {__MODULE__, :__handle_broadway_batch_start__}}
1011+
]
1012+
end
1013+
9681014
defp allowance_handlers(_other), do: :unknown
9691015

9701016
# ── Oban allowance handlers ──
@@ -1013,6 +1059,33 @@ defmodule Sentry.Test do
10131059

10141060
def __handle_oban_job_finish__(_event, _measurements, _metadata, _config), do: :ok
10151061

1062+
# ── Broadway allowance handler ──
1063+
#
1064+
# The handler walks the batch's messages looking for a
1065+
# `:sentry_test_owner` metadata entry — the documented Broadway test
1066+
# pattern, identical in shape to the `:ecto_sandbox` example in the
1067+
# Broadway testing guide. Tests submit messages via
1068+
# `Broadway.test_message/3` with
1069+
# `metadata: %{sentry_test_owner: self()}` (or any custom producer
1070+
# that propagates `:metadata` onto `%Broadway.Message{}`).
1071+
@doc false
1072+
def __handle_broadway_batch_start__(_event, _measurements, %{messages: messages}, _config)
1073+
when is_list(messages) do
1074+
case find_broadway_owner(messages) do
1075+
nil -> :ok
1076+
owner_pid -> safe_allow(owner_pid, self())
1077+
end
1078+
end
1079+
1080+
def __handle_broadway_batch_start__(_event, _measurements, _metadata, _config), do: :ok
1081+
1082+
defp find_broadway_owner(messages) do
1083+
Enum.find_value(messages, fn
1084+
%{metadata: %{sentry_test_owner: pid}} when is_pid(pid) -> pid
1085+
_ -> nil
1086+
end)
1087+
end
1088+
10161089
# Best-effort allow used by the Oban / Broadway dispatch handlers.
10171090
# Swallows the `ArgumentError` that `allow_sentry_reports/2` raises
10181091
# when:

test/sentry/test_test.exs

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,134 @@ defmodule Sentry.TestTest do
552552
end
553553
end
554554

555+
describe "setup_sentry/1 with allowance: [Broadway] (synthetic events)" do
556+
setup do
557+
SentryTest.setup_sentry(allowance: [Broadway])
558+
end
559+
560+
test "processor batch start with a tagged message routes the worker" do
561+
test_pid = self()
562+
worker_done = make_ref()
563+
564+
worker =
565+
spawn(fn ->
566+
messages = [%{metadata: %{sentry_test_owner: test_pid}}]
567+
568+
:telemetry.execute(
569+
[:broadway, :processor, :start],
570+
%{},
571+
%{messages: messages}
572+
)
573+
574+
captured =
575+
case Sentry.capture_message("from broadway processor", result: :sync) do
576+
{:ok, _} -> :captured
577+
other -> {:unexpected, other}
578+
end
579+
580+
send(test_pid, {worker_done, captured})
581+
end)
582+
583+
ref = Process.monitor(worker)
584+
assert_receive {^worker_done, :captured}, 5_000
585+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
586+
587+
assert [%Sentry.Event{message: %{formatted: "from broadway processor"}}] =
588+
SentryTest.pop_sentry_reports()
589+
end
590+
591+
test "batch_processor start with a tagged message routes the worker" do
592+
test_pid = self()
593+
worker_done = make_ref()
594+
595+
worker =
596+
spawn(fn ->
597+
messages = [%{metadata: %{sentry_test_owner: test_pid}}]
598+
599+
:telemetry.execute(
600+
[:broadway, :batch_processor, :start],
601+
%{},
602+
%{messages: messages}
603+
)
604+
605+
captured =
606+
case Sentry.capture_message("from broadway batch", result: :sync) do
607+
{:ok, _} -> :captured
608+
other -> {:unexpected, other}
609+
end
610+
611+
send(test_pid, {worker_done, captured})
612+
end)
613+
614+
ref = Process.monitor(worker)
615+
assert_receive {^worker_done, :captured}, 5_000
616+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
617+
618+
assert [%Sentry.Event{message: %{formatted: "from broadway batch"}}] =
619+
SentryTest.pop_sentry_reports()
620+
end
621+
622+
test "messages without :sentry_test_owner metadata are not auto-allowed" do
623+
test_pid = self()
624+
worker_done = make_ref()
625+
626+
worker =
627+
spawn(fn ->
628+
messages = [%{metadata: %{some_other_key: :foo}}]
629+
630+
:telemetry.execute(
631+
[:broadway, :processor, :start],
632+
%{},
633+
%{messages: messages}
634+
)
635+
636+
captured =
637+
case Sentry.capture_message("untagged broadway", result: :sync) do
638+
{:ok, _} -> :captured
639+
other -> {:unexpected, other}
640+
end
641+
642+
send(test_pid, {worker_done, captured})
643+
end)
644+
645+
ref = Process.monitor(worker)
646+
assert_receive {^worker_done, :captured}, 5_000
647+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
648+
649+
assert [] == SentryTest.pop_sentry_reports()
650+
end
651+
652+
test "uses the first tagged message in a mixed batch" do
653+
test_pid = self()
654+
worker_done = make_ref()
655+
656+
worker =
657+
spawn(fn ->
658+
messages = [
659+
%{metadata: %{some_other_key: :foo}},
660+
%{metadata: %{sentry_test_owner: test_pid}},
661+
%{metadata: %{}}
662+
]
663+
664+
:telemetry.execute(
665+
[:broadway, :processor, :start],
666+
%{},
667+
%{messages: messages}
668+
)
669+
670+
Sentry.capture_message("mixed batch", result: :sync)
671+
send(test_pid, worker_done)
672+
end)
673+
674+
ref = Process.monitor(worker)
675+
assert_receive ^worker_done, 5_000
676+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
677+
678+
assert [%Sentry.Event{message: %{formatted: "mixed batch"}}] =
679+
SentryTest.pop_sentry_reports()
680+
end
681+
end
682+
555683
describe "before_send wrapping" do
556684
test "wraps existing before_send callback" do
557685
test_pid = self()

test_integrations/phoenix_app/mix.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ defmodule PhoenixApp.MixProject do
7575
{:opentelemetry_ecto, "~> 1.2"},
7676
{:opentelemetry_logger_metadata, "~> 0.2.0"},
7777
{:hackney, "~> 1.18"},
78-
{:oban, "~> 2.10"}
78+
{:oban, "~> 2.10"},
79+
{:broadway, "~> 1.0", only: [:test]}
7980
]
8081
end
8182

test_integrations/phoenix_app/mix.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
%{
22
"acceptor_pool": {:hex, :acceptor_pool, "1.0.1", "d88c2e8a0be9216cf513fbcd3e5a4beb36bee3ff4168e85d6152c6f899359cdb", [:rebar3], [], "hexpm", "f172f3d74513e8edd445c257d596fc84dbdd56d2c6fa287434269648ae5a421e"},
33
"bandit": {:hex, :bandit, "1.10.3", "1e5d168fa79ec8de2860d1b4d878d97d4fbbe2fdbe7b0a7d9315a4359d1d4bb9", [:mix], [{:hpax, "~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.18", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "99a52d909c48db65ca598e1962797659e3c0f1d06e825a50c3d75b74a5e2db18"},
4+
"broadway": {:hex, :broadway, "1.3.0", "f75f6376159b74f55c5ba2629dac613e4fd79d9e71148ab5fbac8fdd7c999d2a", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "bef3b4c5512d0072917b70239cbecf8f76a2587465a5b7c3e2b9ae18b4bc405b"},
45
"bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"},
56
"castore": {:hex, :castore, "1.0.17", "4f9770d2d45fbd91dcf6bd404cf64e7e58fed04fadda0923dc32acca0badffa2", [:mix], [], "hexpm", "12d24b9d80b910dd3953e165636d68f147a31db945d2dcb9365e441f8b5351e5"},
67
"cc_precompiler": {:hex, :cc_precompiler, "0.1.11", "8c844d0b9fb98a3edea067f94f616b3f6b29b959b6b3bf25fee94ffe34364768", [:mix], [{:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3427232caf0835f94680e5bcf082408a70b48ad68a5f5c0b02a3bea9f3a075b9"},
@@ -24,6 +25,7 @@
2425
"finch": {:hex, :finch, "0.21.0", "b1c3b2d48af02d0c66d2a9ebfb5622be5c5ecd62937cf79a88a7f98d48a8290c", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "87dc6e169794cb2570f75841a19da99cfde834249568f2a5b121b809588a4377"},
2526
"fine": {:hex, :fine, "0.1.4", "b19a89c1476c7c57afb5f9314aed5960b5bc95d5277de4cb5ee8e1d1616ce379", [:mix], [], "hexpm", "be3324cc454a42d80951cf6023b9954e9ff27c6daa255483b3e8d608670303f5"},
2627
"floki": {:hex, :floki, "0.38.0", "62b642386fa3f2f90713f6e231da0fa3256e41ef1089f83b6ceac7a3fd3abf33", [:mix], [], "hexpm", "a5943ee91e93fb2d635b612caf5508e36d37548e84928463ef9dd986f0d1abd9"},
28+
"gen_stage": {:hex, :gen_stage, "1.3.2", "7c77e5d1e97de2c6c2f78f306f463bca64bf2f4c3cdd606affc0100b89743b7b", [:mix], [], "hexpm", "0ffae547fa777b3ed889a6b9e1e64566217413d018cabd825f786e843ffe63e7"},
2729
"gettext": {:hex, :gettext, "0.26.2", "5978aa7b21fada6deabf1f6341ddba50bc69c999e812211903b169799208f2a8", [:mix], [{:expo, "~> 0.5.1 or ~> 1.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "aa978504bcf76511efdc22d580ba08e2279caab1066b76bb9aa81c4a1e0a32a5"},
2830
"gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"},
2931
"grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"},
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
defmodule PhoenixApp.BroadwayTest do
2+
# async: true — the auto-allowance design routes per-message via the
3+
# :sentry_test_owner metadata, which Broadway propagates onto the
4+
# %Broadway.Message{} struct. Two of these tests racing against each
5+
# other on the same shared pipeline still produce the right per-test
6+
# results because each message carries its origin test pid.
7+
use ExUnit.Case, async: true
8+
9+
describe "setup_sentry/1 with allowance: [Broadway]" do
10+
setup do
11+
Sentry.Test.setup_sentry(allowance: [Broadway])
12+
start_supervised!(PhoenixApp.TestBroadway)
13+
:ok
14+
end
15+
16+
test "events from a Broadway processor are captured when tagged via metadata" do
17+
ref =
18+
Broadway.test_message(PhoenixApp.TestBroadway, :capture,
19+
metadata: %{sentry_test_owner: self()}
20+
)
21+
22+
assert_receive {:ack, ^ref, [_succeeded], []}, 5_000
23+
24+
assert [%Sentry.Event{message: %{formatted: "from broadway"}}] =
25+
Sentry.Test.pop_sentry_reports()
26+
end
27+
28+
test "raw Broadway.test_message without :sentry_test_owner is not auto-allowed" do
29+
ref = Broadway.test_message(PhoenixApp.TestBroadway, :capture)
30+
31+
assert_receive {:ack, ^ref, [_succeeded], []}, 5_000
32+
assert [] == Sentry.Test.pop_sentry_reports()
33+
end
34+
end
35+
36+
describe "without allowance" do
37+
setup do
38+
Sentry.Test.setup_sentry()
39+
start_supervised!(PhoenixApp.TestBroadway)
40+
:ok
41+
end
42+
43+
test "tagged messages are still dropped without allowance: [Broadway]" do
44+
ref =
45+
Broadway.test_message(PhoenixApp.TestBroadway, :capture,
46+
metadata: %{sentry_test_owner: self()}
47+
)
48+
49+
assert_receive {:ack, ^ref, [_succeeded], []}, 5_000
50+
assert [] == Sentry.Test.pop_sentry_reports()
51+
end
52+
end
53+
end
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
defmodule PhoenixApp.TestBroadway do
2+
@moduledoc false
3+
4+
use Broadway
5+
6+
def start_link(_opts \\ []) do
7+
Broadway.start_link(__MODULE__,
8+
name: __MODULE__,
9+
producer: [module: {Broadway.DummyProducer, []}],
10+
processors: [default: [concurrency: 1]]
11+
)
12+
end
13+
14+
@impl true
15+
def handle_message(_processor, %Broadway.Message{data: data} = message, _context) do
16+
case data do
17+
:capture ->
18+
Sentry.capture_message("from broadway", result: :sync)
19+
20+
:raise ->
21+
raise "broadway boom"
22+
23+
_ ->
24+
:ok
25+
end
26+
27+
message
28+
end
29+
end

0 commit comments

Comments
 (0)