Skip to content

Commit f6f8743

Browse files
solnicclaude
andcommitted
feat(tests): support allowance: [Oban] with async-safe job tagging
Installs telemetry handlers that capture the test pid at Oban job insert time and route the worker's captured events back to that test on job start. The pairing makes auto-allowance safe under async: true even when multiple tests share an Oban supervisor, because every job is uniquely tied to the test that scheduled it (rather than to whichever telemetry handler happened to fire first). The tag store lives in Sentry.Test.Registry as a public ETS table (:sentry_test_oban_job_tags) alongside the existing scope-allow table. Tags are dropped on :stop/:exception and defensively on test exit so jobs that crash before completion don't leave stale entries behind. Handlers guard on `is_integer(job.id)` so synthetic jobs from inline mode or ad-hoc telemetry simulations (no persisted id) are skipped silently. The :inline / :manual Oban testing modes run jobs in the test pid anyway, so this auto-allowance is a no-op for them — it only adds capability for the production-like worker case from issue #1052. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5b49637 commit f6f8743

4 files changed

Lines changed: 488 additions & 10 deletions

File tree

lib/sentry/test.ex

Lines changed: 111 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,40 @@ defmodule Sentry.Test do
7777
7878
## Options
7979
80-
* `:allowance` - a list of integration module atoms to enable automatic
81-
`Sentry.Test.allow_sentry_reports/2` wiring for. The integrations land
82-
in follow-up commits; see the integration-specific sections below for
83-
supported entries.
80+
* `:allowance` - a list of integration module atoms (currently `Oban`)
81+
to enable automatic `Sentry.Test.allow_sentry_reports/2` wiring for.
82+
See the "Oban tests" section below.
8483
8584
Any other key is forwarded to the per-test Sentry config (e.g.,
8685
`dedup_events: false`, `traces_sample_rate: 1.0`).
8786
87+
## Oban tests
88+
89+
When you run Oban in `:inline` or `:manual` mode (per the
90+
[Oban testing guide](https://hexdocs.pm/oban/testing.html)), jobs
91+
execute synchronously in the calling process and `Sentry.Test`
92+
captures their events automatically — no `:allowance` option needed.
93+
94+
Use `allowance: [Oban]` when your test exercises a real Oban
95+
supervisor with worker processes (the production-like setup that
96+
issue #1052 was filed for). The option installs telemetry handlers
97+
that tag jobs at insert time and route the worker's captured events
98+
back to the inserting test:
99+
100+
setup do
101+
Sentry.Test.setup_sentry(allowance: [Oban])
102+
end
103+
104+
test "captures events from a real Oban worker" do
105+
{:ok, _} = Oban.insert(MyWorker.new(%{}))
106+
# ... wait for the worker to run ...
107+
assert [%Sentry.Event{}] = Sentry.Test.pop_sentry_reports()
108+
end
109+
110+
Jobs inserted by other processes (cron plugins, jobs scheduling
111+
jobs) are not auto-tagged and require manual
112+
`Sentry.Test.allow_sentry_reports/2`.
113+
88114
## Examples
89115
90116
setup do
@@ -866,10 +892,82 @@ defmodule Sentry.Test do
866892

867893
# Returns the list of `{event_path, {module, function}}` handler pairs for
868894
# a given integration atom, or `:unknown` for unsupported entries (the
869-
# caller turns that into an `ArgumentError`). Commits 2 and 3 prepend
870-
# clauses for `Oban` and `Broadway` respectively.
895+
# caller turns that into an `ArgumentError`).
896+
defp allowance_handlers(Oban) do
897+
[
898+
{[:oban, :engine, :insert_job, :stop], {__MODULE__, :__handle_oban_insert_job__}},
899+
{[:oban, :engine, :insert_all_jobs, :stop], {__MODULE__, :__handle_oban_insert_all_jobs__}},
900+
{[:oban, :job, :start], {__MODULE__, :__handle_oban_job_start__}},
901+
{[:oban, :job, :stop], {__MODULE__, :__handle_oban_job_finish__}},
902+
{[:oban, :job, :exception], {__MODULE__, :__handle_oban_job_finish__}}
903+
]
904+
end
905+
871906
defp allowance_handlers(_other), do: :unknown
872907

908+
# ── Oban allowance handlers ──
909+
#
910+
# Guards on `is_integer(id)` so synthetic jobs from `:inline` mode or
911+
# ad-hoc telemetry simulations (no persisted id) are silently skipped —
912+
# keeps the handlers safe to install in any test config.
913+
914+
@doc false
915+
def __handle_oban_insert_job__(_event, _measurements, %{job: %{id: id}}, _config)
916+
when is_integer(id) do
917+
Sentry.Test.Registry.tag_oban_job(id, self())
918+
end
919+
920+
def __handle_oban_insert_job__(_event, _measurements, _metadata, _config), do: :ok
921+
922+
@doc false
923+
def __handle_oban_insert_all_jobs__(_event, _measurements, %{jobs: jobs}, _config)
924+
when is_list(jobs) do
925+
pid = self()
926+
927+
Enum.each(jobs, fn
928+
%{id: id} when is_integer(id) -> Sentry.Test.Registry.tag_oban_job(id, pid)
929+
_ -> :ok
930+
end)
931+
end
932+
933+
def __handle_oban_insert_all_jobs__(_event, _measurements, _metadata, _config), do: :ok
934+
935+
@doc false
936+
def __handle_oban_job_start__(_event, _measurements, %{job: %{id: id}}, _config)
937+
when is_integer(id) do
938+
case Sentry.Test.Registry.lookup_oban_job(id) do
939+
nil -> :ok
940+
test_pid -> safe_allow(test_pid, self())
941+
end
942+
end
943+
944+
def __handle_oban_job_start__(_event, _measurements, _metadata, _config), do: :ok
945+
946+
@doc false
947+
def __handle_oban_job_finish__(_event, _measurements, %{job: %{id: id}}, _config)
948+
when is_integer(id) do
949+
Sentry.Test.Registry.untag_oban_job(id)
950+
end
951+
952+
def __handle_oban_job_finish__(_event, _measurements, _metadata, _config), do: :ok
953+
954+
# Best-effort allow used by the Oban / Broadway dispatch handlers.
955+
# Swallows the `ArgumentError` that `allow_sentry_reports/2` raises
956+
# when:
957+
#
958+
# * the would-be worker pid is already allowed by another live test
959+
# (concurrent tests racing on a shared worker — first wins);
960+
# * the worker pid is the test pid itself (e.g. Oban :manual mode
961+
# with `drain_queue/2` — `$callers` already routes the events);
962+
# * the owner pid is no longer collecting (test exited between
963+
# insert and start).
964+
defp safe_allow(owner_pid, allowed_pid)
965+
when is_pid(owner_pid) and is_pid(allowed_pid) do
966+
allow_sentry_reports(owner_pid, allowed_pid)
967+
rescue
968+
ArgumentError -> :ok
969+
end
970+
873971
# Sets up collection infrastructure (ETS table, before_send wrapping, config)
874972
# without opening a new Bypass. When no :dsn is provided in extra_config,
875973
# falls back to the default Bypass DSN from Registry.
@@ -935,7 +1033,12 @@ defmodule Sentry.Test do
9351033
# cleans up the key and allowances automatically when this test exits.
9361034
# Drop any worker→processor routing rows that point at this test's
9371035
# processor so a test that exits before its allowed pids do does not
938-
# leave stale rows pointing at a stopped per-test processor.
1036+
# leave stale rows pointing at a stopped per-test processor. Also
1037+
# defensively drop any Oban job tags owned by this test in case a
1038+
# job crashed before emitting a `:stop` / `:exception` event — leaves a
1039+
# stale row pointing at a dead pid otherwise.
1040+
test_pid = self()
1041+
9391042
processor_name =
9401043
Process.get(:sentry_telemetry_processor, Sentry.TelemetryProcessor.default_name())
9411044

@@ -945,6 +1048,7 @@ defmodule Sentry.Test do
9451048
end
9461049

9471050
Sentry.Test.Registry.drop_processor_routing_for(processor_name)
1051+
Sentry.Test.Registry.drop_oban_tags_for(test_pid)
9481052
end)
9491053

9501054
:ok

lib/sentry/test/registry.ex

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ defmodule Sentry.Test.Registry do
2020
# shape keeps `:ets.match_delete` patterns simple.
2121
@routing_table :sentry_test_pid_routing
2222

23+
# Separate ETS table tagging Oban job ids to the test pid that
24+
# scheduled them, used by the Oban auto-allowance integration.
25+
@oban_jobs_table :sentry_test_oban_job_tags
26+
2327
@spec start_link(keyword()) :: GenServer.on_start()
2428
def start_link([] = _opts) do
2529
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
@@ -163,9 +167,77 @@ defmodule Sentry.Test.Registry do
163167
:ok
164168
end
165169

170+
@doc """
171+
Tags an inserted Oban job with the pid of the process that scheduled
172+
it. Used by the `allowance: [Oban]` telemetry handlers in
173+
`Sentry.Test` to route a worker's captured events back to the
174+
inserting test under `async: true`.
175+
176+
Direct ETS write — atomic, no GenServer round-trip.
177+
"""
178+
@spec tag_oban_job(integer(), pid()) :: :ok
179+
def tag_oban_job(job_id, owner_pid)
180+
when is_integer(job_id) and is_pid(owner_pid) do
181+
if :ets.whereis(@oban_jobs_table) != :undefined do
182+
:ets.insert(@oban_jobs_table, {job_id, owner_pid})
183+
end
184+
185+
:ok
186+
end
187+
188+
@doc """
189+
Returns the pid that tagged `job_id`, or `nil` if the tag is missing
190+
or the tagging pid is no longer alive.
191+
"""
192+
@spec lookup_oban_job(integer()) :: pid() | nil
193+
def lookup_oban_job(job_id) when is_integer(job_id) do
194+
case :ets.whereis(@oban_jobs_table) do
195+
:undefined ->
196+
nil
197+
198+
_ ->
199+
case :ets.lookup(@oban_jobs_table, job_id) do
200+
[{^job_id, pid}] when is_pid(pid) ->
201+
if Process.alive?(pid), do: pid, else: nil
202+
203+
[] ->
204+
nil
205+
end
206+
end
207+
end
208+
209+
@doc """
210+
Removes the tag for `job_id`. Called from the `:oban, :job, :stop`
211+
and `:oban, :job, :exception` handlers.
212+
"""
213+
@spec untag_oban_job(integer()) :: :ok
214+
def untag_oban_job(job_id) when is_integer(job_id) do
215+
if :ets.whereis(@oban_jobs_table) != :undefined do
216+
:ets.delete(@oban_jobs_table, job_id)
217+
end
218+
219+
:ok
220+
end
221+
222+
@doc """
223+
Removes every tag whose owner is `owner_pid`. Used by
224+
`setup_collector/1`'s `on_exit/1` cleanup so jobs that crashed
225+
before emitting a `:stop`/`:exception` event don't leave stale tags
226+
behind.
227+
"""
228+
@spec drop_oban_tags_for(pid()) :: :ok
229+
def drop_oban_tags_for(owner_pid) when is_pid(owner_pid) do
230+
if :ets.whereis(@oban_jobs_table) != :undefined do
231+
:ets.match_delete(@oban_jobs_table, {:_, owner_pid})
232+
end
233+
234+
:ok
235+
end
236+
166237
@impl true
167238
def init(nil) do
168239
_routing_table = :ets.new(@routing_table, [:named_table, :public, :set])
240+
_oban_jobs_table = :ets.new(@oban_jobs_table, [:named_table, :public, :set])
169241
maybe_start_default_bypass()
170242
{:ok, %{owner_monitors: %{}}}
171243
end

test/sentry/test_test.exs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,143 @@ defmodule Sentry.TestTest do
342342
end
343343
end
344344

345+
describe "setup_sentry/1 with allowance: [Oban] (synthetic events)" do
346+
setup do
347+
SentryTest.setup_sentry(allowance: [Oban])
348+
end
349+
350+
test "tags the job at insert time and routes the worker on start" do
351+
test_pid = self()
352+
job = %{id: System.unique_integer([:positive])}
353+
354+
:telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job})
355+
assert Sentry.Test.Registry.lookup_oban_job(job.id) == test_pid
356+
357+
worker_done = make_ref()
358+
359+
# Raw spawn/1 — does NOT propagate $callers, so the worker has no
360+
# caller-chain link back to the test. The tag store is the only
361+
# path that can route this worker's events to the test's collector.
362+
worker =
363+
spawn(fn ->
364+
:telemetry.execute([:oban, :job, :start], %{}, %{job: job})
365+
366+
captured =
367+
case Sentry.capture_message("oban hello", result: :sync) do
368+
{:ok, _} -> :captured
369+
other -> {:unexpected, other}
370+
end
371+
372+
send(test_pid, {worker_done, captured})
373+
end)
374+
375+
ref = Process.monitor(worker)
376+
assert_receive {^worker_done, :captured}, 5_000
377+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
378+
379+
assert [%Sentry.Event{message: %{formatted: "oban hello"}}] =
380+
SentryTest.pop_sentry_reports()
381+
end
382+
383+
test "ignores jobs that were not tagged at insert time" do
384+
test_pid = self()
385+
job = %{id: System.unique_integer([:positive])}
386+
worker_done = make_ref()
387+
388+
worker =
389+
spawn(fn ->
390+
:telemetry.execute([:oban, :job, :start], %{}, %{job: job})
391+
392+
captured =
393+
case Sentry.capture_message("untagged", result: :sync) do
394+
{:ok, _} -> :captured
395+
other -> {:unexpected, other}
396+
end
397+
398+
send(test_pid, {worker_done, captured})
399+
end)
400+
401+
ref = Process.monitor(worker)
402+
assert_receive {^worker_done, :captured}, 5_000
403+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
404+
405+
assert [] == SentryTest.pop_sentry_reports()
406+
end
407+
408+
test "untags the job on :stop" do
409+
job = %{id: System.unique_integer([:positive])}
410+
:telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job})
411+
assert Sentry.Test.Registry.lookup_oban_job(job.id) == self()
412+
413+
:telemetry.execute([:oban, :job, :stop], %{}, %{job: job})
414+
refute Sentry.Test.Registry.lookup_oban_job(job.id)
415+
end
416+
417+
test "untags the job on :exception" do
418+
job = %{id: System.unique_integer([:positive])}
419+
:telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job})
420+
assert Sentry.Test.Registry.lookup_oban_job(job.id) == self()
421+
422+
:telemetry.execute([:oban, :job, :exception], %{}, %{job: job})
423+
refute Sentry.Test.Registry.lookup_oban_job(job.id)
424+
end
425+
426+
test "insert_all_jobs tags every job in the batch" do
427+
jobs = [
428+
%{id: System.unique_integer([:positive])},
429+
%{id: System.unique_integer([:positive])}
430+
]
431+
432+
:telemetry.execute([:oban, :engine, :insert_all_jobs, :stop], %{}, %{jobs: jobs})
433+
434+
for job <- jobs do
435+
assert Sentry.Test.Registry.lookup_oban_job(job.id) == self()
436+
end
437+
end
438+
439+
test "silently ignores synthetic jobs without an integer id" do
440+
# :inline mode jobs / ad-hoc telemetry simulations may carry no id.
441+
:telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: %{id: nil}})
442+
:telemetry.execute([:oban, :job, :start], %{}, %{job: %{id: nil}})
443+
:ok
444+
end
445+
446+
test "two concurrent test scopes are routed independently" do
447+
test_pid = self()
448+
job_for_me = %{id: System.unique_integer([:positive])}
449+
job_for_peer = %{id: System.unique_integer([:positive])}
450+
451+
# Spawn a peer that acts as a separate live owner via NimbleOwnership,
452+
# tags its own Oban job, and reports back when ready.
453+
peer =
454+
spawn(fn ->
455+
{:ok, _} =
456+
NimbleOwnership.get_and_update(
457+
Sentry.Test.OwnershipServer,
458+
self(),
459+
:sentry_test_collector,
460+
fn _ -> {:ok, :peer_table} end
461+
)
462+
463+
Sentry.Test.Registry.tag_oban_job(job_for_peer.id, self())
464+
send(test_pid, :claimed)
465+
466+
receive do
467+
:exit -> :ok
468+
end
469+
end)
470+
471+
on_exit(fn -> Process.exit(peer, :kill) end)
472+
assert_receive :claimed, 5_000
473+
474+
# Tag my own job.
475+
:telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job_for_me})
476+
477+
assert Sentry.Test.Registry.lookup_oban_job(job_for_me.id) == test_pid
478+
assert Sentry.Test.Registry.lookup_oban_job(job_for_peer.id) == peer
479+
end
480+
end
481+
345482
describe "before_send wrapping" do
346483
test "wraps existing before_send callback" do
347484
test_pid = self()

0 commit comments

Comments
 (0)