Skip to content

Commit b622a35

Browse files
solnicclaude
andcommitted
feat(tests): support allowance: [Oban] with async-safe job tagging
Installs telemetry handlers that capture the test pid at Oban job insert time and route the worker's captured events back to that test on job start. The pairing makes auto-allowance safe under async: true even when multiple tests share an Oban supervisor, because every job is uniquely tied to the test that scheduled it (rather than to whichever telemetry handler happened to fire first). The tag store lives in Sentry.Test.Registry as a public ETS table (:sentry_test_oban_job_tags) alongside the existing scope-allow table. Tags are dropped on :stop/:exception and defensively on test exit so jobs that crash before completion don't leave stale entries behind. Handlers guard on `is_integer(job.id)` so synthetic jobs from inline mode or ad-hoc telemetry simulations (no persisted id) are skipped silently. The :inline / :manual Oban testing modes run jobs in the test pid anyway, so this auto-allowance is a no-op for them — it only adds capability for the production-like worker case from issue #1052. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4d8b059 commit b622a35

4 files changed

Lines changed: 485 additions & 8 deletions

File tree

lib/sentry/test.ex

Lines changed: 110 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,9 @@ defmodule Sentry.Test do
7575
7676
## Options
7777
78-
* `:allowance` - a list of integration module atoms to enable automatic
79-
`Sentry.Test.allow_sentry_reports/2` wiring for. The integrations land
80-
in follow-up commits; see the integration-specific sections below for
81-
supported entries.
78+
* `:allowance` - a list of integration module atoms (currently `Oban`)
79+
to enable automatic `Sentry.Test.allow_sentry_reports/2` wiring for.
80+
See the "Oban tests" section below.
8281
8382
Any other key is forwarded to the per-test Sentry config (e.g.,
8483
`dedup_events: false`, `traces_sample_rate: 1.0`).
@@ -99,6 +98,32 @@ defmodule Sentry.Test do
9998
10099
This collapses the common `bypass = setup_sentry(...); ref =
101100
setup_bypass_envelope_collector(bypass)` two-step into one call.
101+
## Oban tests
102+
103+
When you run Oban in `:inline` or `:manual` mode (per the
104+
[Oban testing guide](https://hexdocs.pm/oban/testing.html)), jobs
105+
execute synchronously in the calling process and `Sentry.Test`
106+
captures their events automatically — no `:allowance` option needed.
107+
108+
Use `allowance: [Oban]` when your test exercises a real Oban
109+
supervisor with worker processes (the production-like setup that
110+
issue #1052 was filed for). The option installs telemetry handlers
111+
that tag jobs at insert time and route the worker's captured events
112+
back to the inserting test:
113+
114+
setup do
115+
Sentry.Test.setup_sentry(allowance: [Oban])
116+
end
117+
118+
test "captures events from a real Oban worker" do
119+
{:ok, _} = Oban.insert(MyWorker.new(%{}))
120+
# ... wait for the worker to run ...
121+
assert [%Sentry.Event{}] = Sentry.Test.pop_sentry_reports()
122+
end
123+
124+
Jobs inserted by other processes (cron plugins, jobs scheduling
125+
jobs) are not auto-tagged and require manual
126+
`Sentry.Test.allow_sentry_reports/2`.
102127
103128
## Examples
104129
@@ -914,10 +939,82 @@ defmodule Sentry.Test do
914939

915940
# Returns the list of `{event_path, {module, function}}` handler pairs for
916941
# a given integration atom, or `:unknown` for unsupported entries (the
917-
# caller turns that into an `ArgumentError`). Commits 2 and 3 prepend
918-
# clauses for `Oban` and `Broadway` respectively.
942+
# caller turns that into an `ArgumentError`).
943+
defp allowance_handlers(Oban) do
944+
[
945+
{[:oban, :engine, :insert_job, :stop], {__MODULE__, :__handle_oban_insert_job__}},
946+
{[:oban, :engine, :insert_all_jobs, :stop], {__MODULE__, :__handle_oban_insert_all_jobs__}},
947+
{[:oban, :job, :start], {__MODULE__, :__handle_oban_job_start__}},
948+
{[:oban, :job, :stop], {__MODULE__, :__handle_oban_job_finish__}},
949+
{[:oban, :job, :exception], {__MODULE__, :__handle_oban_job_finish__}}
950+
]
951+
end
952+
919953
defp allowance_handlers(_other), do: :unknown
920954

955+
# ── Oban allowance handlers ──
956+
#
957+
# Guards on `is_integer(id)` so synthetic jobs from `:inline` mode or
958+
# ad-hoc telemetry simulations (no persisted id) are silently skipped —
959+
# keeps the handlers safe to install in any test config.
960+
961+
@doc false
962+
def __handle_oban_insert_job__(_event, _measurements, %{job: %{id: id}}, _config)
963+
when is_integer(id) do
964+
Sentry.Test.Registry.tag_oban_job(id, self())
965+
end
966+
967+
def __handle_oban_insert_job__(_event, _measurements, _metadata, _config), do: :ok
968+
969+
@doc false
970+
def __handle_oban_insert_all_jobs__(_event, _measurements, %{jobs: jobs}, _config)
971+
when is_list(jobs) do
972+
pid = self()
973+
974+
Enum.each(jobs, fn
975+
%{id: id} when is_integer(id) -> Sentry.Test.Registry.tag_oban_job(id, pid)
976+
_ -> :ok
977+
end)
978+
end
979+
980+
def __handle_oban_insert_all_jobs__(_event, _measurements, _metadata, _config), do: :ok
981+
982+
@doc false
983+
def __handle_oban_job_start__(_event, _measurements, %{job: %{id: id}}, _config)
984+
when is_integer(id) do
985+
case Sentry.Test.Registry.lookup_oban_job(id) do
986+
nil -> :ok
987+
test_pid -> safe_allow(test_pid, self())
988+
end
989+
end
990+
991+
def __handle_oban_job_start__(_event, _measurements, _metadata, _config), do: :ok
992+
993+
@doc false
994+
def __handle_oban_job_finish__(_event, _measurements, %{job: %{id: id}}, _config)
995+
when is_integer(id) do
996+
Sentry.Test.Registry.untag_oban_job(id)
997+
end
998+
999+
def __handle_oban_job_finish__(_event, _measurements, _metadata, _config), do: :ok
1000+
1001+
# Best-effort allow used by the Oban / Broadway dispatch handlers.
1002+
# Swallows the `ArgumentError` that `allow_sentry_reports/2` raises
1003+
# when:
1004+
#
1005+
# * the would-be worker pid is already allowed by another live test
1006+
# (concurrent tests racing on a shared worker — first wins);
1007+
# * the worker pid is the test pid itself (e.g. Oban :manual mode
1008+
# with `drain_queue/2` — `$callers` already routes the events);
1009+
# * the owner pid is no longer collecting (test exited between
1010+
# insert and start).
1011+
defp safe_allow(owner_pid, allowed_pid)
1012+
when is_pid(owner_pid) and is_pid(allowed_pid) do
1013+
allow_sentry_reports(owner_pid, allowed_pid)
1014+
rescue
1015+
ArgumentError -> :ok
1016+
end
1017+
9211018
# Sets up collection infrastructure (ETS table, before_send wrapping, config)
9221019
# without opening a new Bypass. When no :dsn is provided in extra_config,
9231020
# falls back to the default Bypass DSN from Registry.
@@ -982,7 +1079,11 @@ defmodule Sentry.Test do
9821079
# cleans up the key and allowances automatically when this test exits.
9831080
# Drop any worker→processor routing rows that point at this test's
9841081
# processor so a test that exits before its allowed pids do not
985-
# leave stale rows pointing at a stopped per-test processor.
1082+
# leave stale rows pointing at a stopped per-test processor. Also
1083+
# defensively drop any Oban job tags owned by this test in case a
1084+
# job crashed before emitting a `:stop` / `:exception` event — leaves a
1085+
# stale row pointing at a dead pid otherwise.
1086+
test_pid = self()
9861087
processor_name = Process.get(:sentry_telemetry_processor)
9871088

9881089
ExUnit.Callbacks.on_exit(fn ->
@@ -993,6 +1094,8 @@ defmodule Sentry.Test do
9931094
if is_atom(processor_name) and not is_nil(processor_name) do
9941095
Sentry.Test.Registry.drop_processor_routing_for(processor_name)
9951096
end
1097+
1098+
Sentry.Test.Registry.drop_oban_tags_for(test_pid)
9961099
end)
9971100

9981101
:ok

lib/sentry/test/registry.ex

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ defmodule Sentry.Test.Registry do
2020
# `tag_processor_for/2` sets it.
2121
@routing_table :sentry_test_pid_routing
2222

23+
# Separate ETS table tagging Oban job ids to the test pid that
24+
# scheduled them, used by the Oban auto-allowance integration.
25+
@oban_jobs_table :sentry_test_oban_job_tags
26+
2327
@spec start_link(keyword()) :: GenServer.on_start()
2428
def start_link([] = _opts) do
2529
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
@@ -236,9 +240,77 @@ defmodule Sentry.Test.Registry do
236240
:ok
237241
end
238242

243+
@doc """
244+
Tags an inserted Oban job with the pid of the process that scheduled
245+
it. Used by the `allowance: [Oban]` telemetry handlers in
246+
`Sentry.Test` to route a worker's captured events back to the
247+
inserting test under `async: true`.
248+
249+
Direct ETS write — atomic, no GenServer round-trip.
250+
"""
251+
@spec tag_oban_job(integer(), pid()) :: :ok
252+
def tag_oban_job(job_id, owner_pid)
253+
when is_integer(job_id) and is_pid(owner_pid) do
254+
if :ets.whereis(@oban_jobs_table) != :undefined do
255+
:ets.insert(@oban_jobs_table, {job_id, owner_pid})
256+
end
257+
258+
:ok
259+
end
260+
261+
@doc """
262+
Returns the pid that tagged `job_id`, or `nil` if the tag is missing
263+
or the tagging pid is no longer alive.
264+
"""
265+
@spec lookup_oban_job(integer()) :: pid() | nil
266+
def lookup_oban_job(job_id) when is_integer(job_id) do
267+
case :ets.whereis(@oban_jobs_table) do
268+
:undefined ->
269+
nil
270+
271+
_ ->
272+
case :ets.lookup(@oban_jobs_table, job_id) do
273+
[{^job_id, pid}] when is_pid(pid) ->
274+
if Process.alive?(pid), do: pid, else: nil
275+
276+
[] ->
277+
nil
278+
end
279+
end
280+
end
281+
282+
@doc """
283+
Removes the tag for `job_id`. Called from the `:oban, :job, :stop`
284+
and `:oban, :job, :exception` handlers.
285+
"""
286+
@spec untag_oban_job(integer()) :: :ok
287+
def untag_oban_job(job_id) when is_integer(job_id) do
288+
if :ets.whereis(@oban_jobs_table) != :undefined do
289+
:ets.delete(@oban_jobs_table, job_id)
290+
end
291+
292+
:ok
293+
end
294+
295+
@doc """
296+
Removes every tag whose owner is `owner_pid`. Used by
297+
`setup_collector/1`'s `on_exit/1` cleanup so jobs that crashed
298+
before emitting a `:stop`/`:exception` event don't leave stale tags
299+
behind.
300+
"""
301+
@spec drop_oban_tags_for(pid()) :: :ok
302+
def drop_oban_tags_for(owner_pid) when is_pid(owner_pid) do
303+
if :ets.whereis(@oban_jobs_table) != :undefined do
304+
:ets.match_delete(@oban_jobs_table, {:_, owner_pid})
305+
end
306+
307+
:ok
308+
end
309+
239310
@impl true
240311
def init(nil) do
241312
_routing_table = :ets.new(@routing_table, [:named_table, :public, :set])
313+
_oban_jobs_table = :ets.new(@oban_jobs_table, [:named_table, :public, :set])
242314
maybe_start_default_bypass()
243315
{:ok, %{owner_monitors: %{}}}
244316
end

test/sentry/test_test.exs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,143 @@ defmodule Sentry.TestTest do
416416
end
417417
end
418418

419+
describe "setup_sentry/1 with allowance: [Oban] (synthetic events)" do
420+
setup do
421+
SentryTest.setup_sentry(allowance: [Oban])
422+
end
423+
424+
test "tags the job at insert time and routes the worker on start" do
425+
test_pid = self()
426+
job = %{id: System.unique_integer([:positive])}
427+
428+
:telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job})
429+
assert Sentry.Test.Registry.lookup_oban_job(job.id) == test_pid
430+
431+
worker_done = make_ref()
432+
433+
# Raw spawn/1 — does NOT propagate $callers, so the worker has no
434+
# caller-chain link back to the test. The tag store is the only
435+
# path that can route this worker's events to the test's collector.
436+
worker =
437+
spawn(fn ->
438+
:telemetry.execute([:oban, :job, :start], %{}, %{job: job})
439+
440+
captured =
441+
case Sentry.capture_message("oban hello", result: :sync) do
442+
{:ok, _} -> :captured
443+
other -> {:unexpected, other}
444+
end
445+
446+
send(test_pid, {worker_done, captured})
447+
end)
448+
449+
ref = Process.monitor(worker)
450+
assert_receive {^worker_done, :captured}, 5_000
451+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
452+
453+
assert [%Sentry.Event{message: %{formatted: "oban hello"}}] =
454+
SentryTest.pop_sentry_reports()
455+
end
456+
457+
test "ignores jobs that were not tagged at insert time" do
458+
test_pid = self()
459+
job = %{id: System.unique_integer([:positive])}
460+
worker_done = make_ref()
461+
462+
worker =
463+
spawn(fn ->
464+
:telemetry.execute([:oban, :job, :start], %{}, %{job: job})
465+
466+
captured =
467+
case Sentry.capture_message("untagged", result: :sync) do
468+
{:ok, _} -> :captured
469+
other -> {:unexpected, other}
470+
end
471+
472+
send(test_pid, {worker_done, captured})
473+
end)
474+
475+
ref = Process.monitor(worker)
476+
assert_receive {^worker_done, :captured}, 5_000
477+
assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000
478+
479+
assert [] == SentryTest.pop_sentry_reports()
480+
end
481+
482+
test "untags the job on :stop" do
483+
job = %{id: System.unique_integer([:positive])}
484+
:telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job})
485+
assert Sentry.Test.Registry.lookup_oban_job(job.id) == self()
486+
487+
:telemetry.execute([:oban, :job, :stop], %{}, %{job: job})
488+
refute Sentry.Test.Registry.lookup_oban_job(job.id)
489+
end
490+
491+
test "untags the job on :exception" do
492+
job = %{id: System.unique_integer([:positive])}
493+
:telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job})
494+
assert Sentry.Test.Registry.lookup_oban_job(job.id) == self()
495+
496+
:telemetry.execute([:oban, :job, :exception], %{}, %{job: job})
497+
refute Sentry.Test.Registry.lookup_oban_job(job.id)
498+
end
499+
500+
test "insert_all_jobs tags every job in the batch" do
501+
jobs = [
502+
%{id: System.unique_integer([:positive])},
503+
%{id: System.unique_integer([:positive])}
504+
]
505+
506+
:telemetry.execute([:oban, :engine, :insert_all_jobs, :stop], %{}, %{jobs: jobs})
507+
508+
for job <- jobs do
509+
assert Sentry.Test.Registry.lookup_oban_job(job.id) == self()
510+
end
511+
end
512+
513+
test "silently ignores synthetic jobs without an integer id" do
514+
# :inline mode jobs / ad-hoc telemetry simulations may carry no id.
515+
:telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: %{id: nil}})
516+
:telemetry.execute([:oban, :job, :start], %{}, %{job: %{id: nil}})
517+
:ok
518+
end
519+
520+
test "two concurrent test scopes are routed independently" do
521+
test_pid = self()
522+
job_for_me = %{id: System.unique_integer([:positive])}
523+
job_for_peer = %{id: System.unique_integer([:positive])}
524+
525+
# Spawn a peer that acts as a separate live owner via NimbleOwnership,
526+
# tags its own Oban job, and reports back when ready.
527+
peer =
528+
spawn(fn ->
529+
{:ok, _} =
530+
NimbleOwnership.get_and_update(
531+
Sentry.Test.OwnershipServer,
532+
self(),
533+
:sentry_test_collector,
534+
fn _ -> {:ok, :peer_table} end
535+
)
536+
537+
Sentry.Test.Registry.tag_oban_job(job_for_peer.id, self())
538+
send(test_pid, :claimed)
539+
540+
receive do
541+
:exit -> :ok
542+
end
543+
end)
544+
545+
on_exit(fn -> Process.exit(peer, :kill) end)
546+
assert_receive :claimed, 5_000
547+
548+
# Tag my own job.
549+
:telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job_for_me})
550+
551+
assert Sentry.Test.Registry.lookup_oban_job(job_for_me.id) == test_pid
552+
assert Sentry.Test.Registry.lookup_oban_job(job_for_peer.id) == peer
553+
end
554+
end
555+
419556
describe "before_send wrapping" do
420557
test "wraps existing before_send callback" do
421558
test_pid = self()

0 commit comments

Comments
 (0)