Skip to content

Commit 260edad

Browse files
solnicclaude
andcommitted
feat(tests): support allowance: [Broadway] with metadata-tagged routing
Adds Broadway to the :allowance dispatch. The handler subscribes to [:broadway, :processor, :start] and [:broadway, :batch_processor, :start], reads the first :sentry_test_owner found in metadata.messages[*].metadata, and calls allow_sentry_reports/2 for that test pid. This follows the same shape Broadway documents for Ecto sandbox testing (metadata: %{ecto_sandbox: self()}) — no wrapper around Broadway.test_message/3, no hidden state, and async-safe by design because each message carries its origin test pid. Messages without :sentry_test_owner are silently skipped. Integration coverage adds a minimal Broadway pipeline (PhoenixApp.TestBroadway) and a 3-test broadway_test.exs in phoenix_app proving: (1) tagged + allowance captures, (2) untagged + allowance does not, (3) tagged + no allowance does not. async: true on the describe block to validate cross-test isolation through a shared pipeline. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent f6f8743 commit 260edad

6 files changed

Lines changed: 291 additions & 5 deletions

File tree

lib/sentry/test.ex

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ defmodule Sentry.Test do
5353

5454
@moduledoc since: "10.2.0"
5555

56-
@compile {:no_warn_undefined, [Bypass, Plug.Conn, NimbleOwnership, :telemetry]}
56+
@compile {:no_warn_undefined, [Bypass, Plug.Conn, NimbleOwnership, :telemetry, Broadway]}
5757

5858
@ownership_server Sentry.Test.OwnershipServer
5959
@collector_key :sentry_test_scope
@@ -77,9 +77,9 @@ defmodule Sentry.Test do
7777
7878
## Options
7979
80-
* `:allowance` - a list of integration module atoms (currently `Oban`)
81-
to enable automatic `Sentry.Test.allow_sentry_reports/2` wiring for.
82-
See the "Oban tests" section below.
80+
* `:allowance` - a list of integration module atoms (currently `Oban`
81+
and `Broadway`) to enable automatic `Sentry.Test.allow_sentry_reports/2`
82+
wiring for. See the "Oban tests" and "Broadway tests" sections below.
8383
8484
Any other key is forwarded to the per-test Sentry config (e.g.,
8585
`dedup_events: false`, `traces_sample_rate: 1.0`).
@@ -111,6 +111,41 @@ defmodule Sentry.Test do
111111
jobs) are not auto-tagged and require manual
112112
`Sentry.Test.allow_sentry_reports/2`.
113113
114+
## Broadway tests
115+
116+
To route events from a Broadway processor or batch-processor back
117+
to your test, pass `:sentry_test_owner` in the message metadata
118+
when injecting messages via `Broadway.test_message/3` or
119+
`Broadway.test_batch/3`:
120+
121+
setup do
122+
Sentry.Test.setup_sentry(allowance: [Broadway])
123+
start_supervised!(MyPipeline)
124+
:ok
125+
end
126+
127+
test "captures events from the processor" do
128+
ref =
129+
Broadway.test_message(MyPipeline, payload,
130+
metadata: %{sentry_test_owner: self()}
131+
)
132+
133+
assert_receive {:ack, ^ref, [_succeeded], []}
134+
135+
assert [%Sentry.Event{}] = Sentry.Test.pop_sentry_reports()
136+
end
137+
138+
This mirrors the [Ecto sandbox pattern documented in
139+
Broadway](https://hexdocs.pm/broadway/Broadway.html#module-testing-with-ecto):
140+
the test owner travels with the message itself, so two `async: true`
141+
tests racing through the same pipeline are routed independently.
142+
143+
Messages submitted without the `:sentry_test_owner` metadata are not
144+
auto-allowed — the handler silently skips them. For production
145+
producers (Kafka, SQS, etc.) that need the same routing, attach the
146+
same key to the messages they emit; the handler reads it regardless
147+
of source.
148+
114149
## Examples
115150
116151
setup do
@@ -903,6 +938,17 @@ defmodule Sentry.Test do
903938
]
904939
end
905940

941+
defp allowance_handlers(Broadway) do
942+
# Both events fire once per worker invocation (per batch) in the
943+
# processor / batch-processor pid, with metadata.messages giving
944+
# the full batch. Reading the owner from message metadata is the
945+
# documented Broadway pattern (same shape as `ecto_sandbox`).
946+
[
947+
{[:broadway, :processor, :start], {__MODULE__, :__handle_broadway_batch_start__}},
948+
{[:broadway, :batch_processor, :start], {__MODULE__, :__handle_broadway_batch_start__}}
949+
]
950+
end
951+
906952
defp allowance_handlers(_other), do: :unknown
907953

908954
# ── Oban allowance handlers ──
@@ -951,6 +997,33 @@ defmodule Sentry.Test do
951997

952998
def __handle_oban_job_finish__(_event, _measurements, _metadata, _config), do: :ok
953999

1000+
# ── Broadway allowance handler ──
1001+
#
1002+
# The handler walks the batch's messages looking for a
1003+
# `:sentry_test_owner` metadata entry — the documented Broadway test
1004+
# pattern, identical in shape to the `:ecto_sandbox` example in the
1005+
# Broadway testing guide. Tests submit messages via
1006+
# `Broadway.test_message/3` with
1007+
# `metadata: %{sentry_test_owner: self()}` (or any custom producer
1008+
# that propagates `:metadata` onto `%Broadway.Message{}`).
1009+
@doc false
1010+
def __handle_broadway_batch_start__(_event, _measurements, %{messages: messages}, _config)
1011+
when is_list(messages) do
1012+
case find_broadway_owner(messages) do
1013+
nil -> :ok
1014+
owner_pid -> safe_allow(owner_pid, self())
1015+
end
1016+
end
1017+
1018+
def __handle_broadway_batch_start__(_event, _measurements, _metadata, _config), do: :ok
1019+
1020+
defp find_broadway_owner(messages) do
1021+
Enum.find_value(messages, fn
1022+
%{metadata: %{sentry_test_owner: pid}} when is_pid(pid) -> pid
1023+
_ -> nil
1024+
end)
1025+
end
1026+
9541027
# Best-effort allow used by the Oban / Broadway dispatch handlers.
9551028
# Swallows the `ArgumentError` that `allow_sentry_reports/2` raises
9561029
# when:

test/sentry/test_test.exs

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,134 @@ defmodule Sentry.TestTest do
479479
end
480480
end
481481

482+
describe "setup_sentry/1 with allowance: [Broadway] (synthetic events)" do
483+
setup do
484+
SentryTest.setup_sentry(allowance: [Broadway])
485+
end
486+
487+
test "processor batch start with a tagged message routes the worker" do
488+
test_pid = self()
489+
worker_done = make_ref()
490+
491+
worker =
492+
spawn(fn ->
493+
messages = [%{metadata: %{sentry_test_owner: test_pid}}]
494+
495+
:telemetry.execute(
496+
[:broadway, :processor, :start],
497+
%{},
498+
%{messages: messages}
499+
)
500+
501+
captured =
502+
case Sentry.capture_message("from broadway processor", result: :sync) do
503+
{:ok, _} -> :captured
504+
other -> {:unexpected, other}
505+
end
506+
507+
send(test_pid, {worker_done, captured})
508+
end)
509+
510+
ref = Process.monitor(worker)
511+
assert_receive {^worker_done, :captured}, 5_000
512+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
513+
514+
assert [%Sentry.Event{message: %{formatted: "from broadway processor"}}] =
515+
SentryTest.pop_sentry_reports()
516+
end
517+
518+
test "batch_processor start with a tagged message routes the worker" do
519+
test_pid = self()
520+
worker_done = make_ref()
521+
522+
worker =
523+
spawn(fn ->
524+
messages = [%{metadata: %{sentry_test_owner: test_pid}}]
525+
526+
:telemetry.execute(
527+
[:broadway, :batch_processor, :start],
528+
%{},
529+
%{messages: messages}
530+
)
531+
532+
captured =
533+
case Sentry.capture_message("from broadway batch", result: :sync) do
534+
{:ok, _} -> :captured
535+
other -> {:unexpected, other}
536+
end
537+
538+
send(test_pid, {worker_done, captured})
539+
end)
540+
541+
ref = Process.monitor(worker)
542+
assert_receive {^worker_done, :captured}, 5_000
543+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
544+
545+
assert [%Sentry.Event{message: %{formatted: "from broadway batch"}}] =
546+
SentryTest.pop_sentry_reports()
547+
end
548+
549+
test "messages without :sentry_test_owner metadata are not auto-allowed" do
550+
test_pid = self()
551+
worker_done = make_ref()
552+
553+
worker =
554+
spawn(fn ->
555+
messages = [%{metadata: %{some_other_key: :foo}}]
556+
557+
:telemetry.execute(
558+
[:broadway, :processor, :start],
559+
%{},
560+
%{messages: messages}
561+
)
562+
563+
captured =
564+
case Sentry.capture_message("untagged broadway", result: :sync) do
565+
{:ok, _} -> :captured
566+
other -> {:unexpected, other}
567+
end
568+
569+
send(test_pid, {worker_done, captured})
570+
end)
571+
572+
ref = Process.monitor(worker)
573+
assert_receive {^worker_done, :captured}, 5_000
574+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
575+
576+
assert [] == SentryTest.pop_sentry_reports()
577+
end
578+
579+
test "uses the first tagged message in a mixed batch" do
580+
test_pid = self()
581+
worker_done = make_ref()
582+
583+
worker =
584+
spawn(fn ->
585+
messages = [
586+
%{metadata: %{some_other_key: :foo}},
587+
%{metadata: %{sentry_test_owner: test_pid}},
588+
%{metadata: %{}}
589+
]
590+
591+
:telemetry.execute(
592+
[:broadway, :processor, :start],
593+
%{},
594+
%{messages: messages}
595+
)
596+
597+
Sentry.capture_message("mixed batch", result: :sync)
598+
send(test_pid, worker_done)
599+
end)
600+
601+
ref = Process.monitor(worker)
602+
assert_receive ^worker_done, 5_000
603+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
604+
605+
assert [%Sentry.Event{message: %{formatted: "mixed batch"}}] =
606+
SentryTest.pop_sentry_reports()
607+
end
608+
end
609+
482610
describe "before_send wrapping" do
483611
test "wraps existing before_send callback" do
484612
test_pid = self()

test_integrations/phoenix_app/mix.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ defmodule PhoenixApp.MixProject do
7575
{:opentelemetry_ecto, "~> 1.2"},
7676
{:opentelemetry_logger_metadata, "~> 0.2.0"},
7777
{:hackney, "~> 1.18"},
78-
{:oban, "~> 2.10"}
78+
{:oban, "~> 2.10"},
79+
{:broadway, "~> 1.0", only: [:test]}
7980
]
8081
end
8182

test_integrations/phoenix_app/mix.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
%{
22
"acceptor_pool": {:hex, :acceptor_pool, "1.0.1", "d88c2e8a0be9216cf513fbcd3e5a4beb36bee3ff4168e85d6152c6f899359cdb", [:rebar3], [], "hexpm", "f172f3d74513e8edd445c257d596fc84dbdd56d2c6fa287434269648ae5a421e"},
33
"bandit": {:hex, :bandit, "1.10.3", "1e5d168fa79ec8de2860d1b4d878d97d4fbbe2fdbe7b0a7d9315a4359d1d4bb9", [:mix], [{:hpax, "~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.18", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "99a52d909c48db65ca598e1962797659e3c0f1d06e825a50c3d75b74a5e2db18"},
4+
"broadway": {:hex, :broadway, "1.3.0", "f75f6376159b74f55c5ba2629dac613e4fd79d9e71148ab5fbac8fdd7c999d2a", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "bef3b4c5512d0072917b70239cbecf8f76a2587465a5b7c3e2b9ae18b4bc405b"},
45
"bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"},
56
"castore": {:hex, :castore, "1.0.17", "4f9770d2d45fbd91dcf6bd404cf64e7e58fed04fadda0923dc32acca0badffa2", [:mix], [], "hexpm", "12d24b9d80b910dd3953e165636d68f147a31db945d2dcb9365e441f8b5351e5"},
67
"cc_precompiler": {:hex, :cc_precompiler, "0.1.11", "8c844d0b9fb98a3edea067f94f616b3f6b29b959b6b3bf25fee94ffe34364768", [:mix], [{:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3427232caf0835f94680e5bcf082408a70b48ad68a5f5c0b02a3bea9f3a075b9"},
@@ -24,6 +25,7 @@
2425
"finch": {:hex, :finch, "0.21.0", "b1c3b2d48af02d0c66d2a9ebfb5622be5c5ecd62937cf79a88a7f98d48a8290c", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "87dc6e169794cb2570f75841a19da99cfde834249568f2a5b121b809588a4377"},
2526
"fine": {:hex, :fine, "0.1.4", "b19a89c1476c7c57afb5f9314aed5960b5bc95d5277de4cb5ee8e1d1616ce379", [:mix], [], "hexpm", "be3324cc454a42d80951cf6023b9954e9ff27c6daa255483b3e8d608670303f5"},
2627
"floki": {:hex, :floki, "0.38.0", "62b642386fa3f2f90713f6e231da0fa3256e41ef1089f83b6ceac7a3fd3abf33", [:mix], [], "hexpm", "a5943ee91e93fb2d635b612caf5508e36d37548e84928463ef9dd986f0d1abd9"},
28+
"gen_stage": {:hex, :gen_stage, "1.3.2", "7c77e5d1e97de2c6c2f78f306f463bca64bf2f4c3cdd606affc0100b89743b7b", [:mix], [], "hexpm", "0ffae547fa777b3ed889a6b9e1e64566217413d018cabd825f786e843ffe63e7"},
2729
"gettext": {:hex, :gettext, "0.26.2", "5978aa7b21fada6deabf1f6341ddba50bc69c999e812211903b169799208f2a8", [:mix], [{:expo, "~> 0.5.1 or ~> 1.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "aa978504bcf76511efdc22d580ba08e2279caab1066b76bb9aa81c4a1e0a32a5"},
2830
"gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"},
2931
"grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"},
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
defmodule PhoenixApp.BroadwayTest do
2+
# async: true — the auto-allowance design routes per-message via the
3+
# :sentry_test_owner metadata, which Broadway propagates onto the
4+
# %Broadway.Message{} struct. Two of these tests racing against each
5+
# other on the same shared pipeline still produce the right per-test
6+
# results because each message carries its origin test pid.
7+
use ExUnit.Case, async: true
8+
9+
describe "setup_sentry/1 with allowance: [Broadway]" do
10+
setup do
11+
Sentry.Test.setup_sentry(allowance: [Broadway])
12+
start_supervised!(PhoenixApp.TestBroadway)
13+
:ok
14+
end
15+
16+
test "events from a Broadway processor are captured when tagged via metadata" do
17+
ref =
18+
Broadway.test_message(PhoenixApp.TestBroadway, :capture,
19+
metadata: %{sentry_test_owner: self()}
20+
)
21+
22+
assert_receive {:ack, ^ref, [_succeeded], []}, 5_000
23+
24+
assert [%Sentry.Event{message: %{formatted: "from broadway"}}] =
25+
Sentry.Test.pop_sentry_reports()
26+
end
27+
28+
test "raw Broadway.test_message without :sentry_test_owner is not auto-allowed" do
29+
ref = Broadway.test_message(PhoenixApp.TestBroadway, :capture)
30+
31+
assert_receive {:ack, ^ref, [_succeeded], []}, 5_000
32+
assert [] == Sentry.Test.pop_sentry_reports()
33+
end
34+
end
35+
36+
describe "without allowance" do
37+
setup do
38+
Sentry.Test.setup_sentry()
39+
start_supervised!(PhoenixApp.TestBroadway)
40+
:ok
41+
end
42+
43+
test "tagged messages are still dropped without allowance: [Broadway]" do
44+
ref =
45+
Broadway.test_message(PhoenixApp.TestBroadway, :capture,
46+
metadata: %{sentry_test_owner: self()}
47+
)
48+
49+
assert_receive {:ack, ^ref, [_succeeded], []}, 5_000
50+
assert [] == Sentry.Test.pop_sentry_reports()
51+
end
52+
end
53+
end
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
defmodule PhoenixApp.TestBroadway do
2+
@moduledoc false
3+
4+
use Broadway
5+
6+
def start_link(_opts \\ []) do
7+
Broadway.start_link(__MODULE__,
8+
name: __MODULE__,
9+
producer: [module: {Broadway.DummyProducer, []}],
10+
processors: [default: [concurrency: 1]]
11+
)
12+
end
13+
14+
@impl true
15+
def handle_message(_processor, %Broadway.Message{data: data} = message, _context) do
16+
case data do
17+
:capture ->
18+
Sentry.capture_message("from broadway", result: :sync)
19+
20+
:raise ->
21+
raise "broadway boom"
22+
23+
_ ->
24+
:ok
25+
end
26+
27+
message
28+
end
29+
end

0 commit comments

Comments
 (0)