Skip to content

Commit 611a647

Browse files
committed
Distinguish queue producer and consumer error mechanisms
1 parent 3fd7452 commit 611a647

File tree

2 files changed

+65
-6
lines changed

2 files changed

+65
-6
lines changed

packages/core/src/integrations/supabase.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -611,14 +611,15 @@ function calculateMessageBodySize(message: unknown): number | undefined {
611611
function captureQueueError(
612612
error: { message: string; code?: string; details?: unknown },
613613
queueName: string | undefined,
614+
mechanismType: string,
614615
messageId?: string,
615616
extraContext?: Record<string, unknown>,
616617
): void {
617618
const err = new Error(error.message) as SupabaseError;
618619
if (error.code) err.code = error.code;
619620
if (error.details) err.details = error.details;
620621

621-
captureSupabaseError(err, 'auto.db.supabase.queue', { queueName, messageId, ...extraContext });
622+
captureSupabaseError(err, mechanismType, { queueName, messageId, ...extraContext });
622623
}
623624

624625
/** Returns latency from an enqueued_at timestamp in milliseconds. */
@@ -764,7 +765,9 @@ function instrumentRpcProducer(
764765
});
765766

766767
if (res.error) {
767-
captureQueueError(res.error, queueName, messageId, { operation: operationName });
768+
captureQueueError(res.error, queueName, 'auto.db.supabase.queue.producer', messageId, {
769+
operation: operationName,
770+
});
768771
}
769772

770773
span.setStatus({ code: res.error ? SPAN_STATUS_ERROR : SPAN_STATUS_OK });
@@ -774,7 +777,7 @@ function instrumentRpcProducer(
774777
(err: unknown) => {
775778
span.setStatus({ code: SPAN_STATUS_ERROR });
776779

777-
captureSupabaseError(err, 'auto.db.supabase.queue', { queueName, operation: operationName });
780+
captureSupabaseError(err, 'auto.db.supabase.queue.producer', { queueName, operation: operationName });
778781

779782
throw err;
780783
},
@@ -973,7 +976,7 @@ function instrumentRpcConsumer(
973976
});
974977

975978
if (cleanedRes.error) {
976-
captureQueueError(cleanedRes.error, queueName);
979+
captureQueueError(cleanedRes.error, queueName, 'auto.db.supabase.queue.consumer');
977980
}
978981

979982
return cleanedRes;
@@ -983,7 +986,7 @@ function instrumentRpcConsumer(
983986

984987
if (cleanedRes.error) {
985988
const messageId = extractMessageIds(cleanedData);
986-
captureQueueError(cleanedRes.error, queueName, messageId);
989+
captureQueueError(cleanedRes.error, queueName, 'auto.db.supabase.queue.consumer', messageId);
987990
}
988991

989992
span.setStatus({ code: cleanedRes.error ? SPAN_STATUS_ERROR : SPAN_STATUS_OK });
@@ -998,7 +1001,7 @@ function instrumentRpcConsumer(
9981001
data: { 'messaging.destination.name': queueName },
9991002
});
10001003

1001-
captureSupabaseError(err, 'auto.db.supabase.queue', { queueName });
1004+
captureSupabaseError(err, 'auto.db.supabase.queue.consumer', { queueName });
10021005

10031006
span.setStatus({ code: SPAN_STATUS_ERROR });
10041007
throw err;

packages/core/test/lib/integrations/supabase-queues.test.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
22
import type { Client } from '../../../src';
33
import * as CurrentScopes from '../../../src/currentScopes';
4+
import * as exports from '../../../src/exports';
45
import type { SupabaseClientInstance, SupabaseResponse } from '../../../src/integrations/supabase';
56
import { instrumentSupabaseClient } from '../../../src/integrations/supabase';
67
import { SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '../../../src/semanticAttributes';
@@ -156,6 +157,37 @@ describe('Supabase Queue Instrumentation', () => {
156157
).rejects.toThrow('Queue send failed');
157158
});
158159

160+
it('should capture producer errors with producer mechanism type', async () => {
161+
const captureExceptionSpy = vi.spyOn(exports, 'captureException').mockImplementation(() => '');
162+
163+
await callRpc(
164+
mockRpcFunction,
165+
mockSupabaseClient,
166+
'send',
167+
{ queue_name: 'test-queue', message: { foo: 'bar' } },
168+
ERROR_RESPONSE,
169+
);
170+
171+
expect(captureExceptionSpy).toHaveBeenCalledTimes(1);
172+
173+
// Execute the scope callback to verify mechanism type
174+
const scopeCallback = captureExceptionSpy.mock.calls[0]![1] as (scope: any) => any;
175+
const mockScope = { addEventProcessor: vi.fn().mockReturnThis(), setContext: vi.fn().mockReturnThis() };
176+
scopeCallback(mockScope);
177+
178+
const eventProcessor = mockScope.addEventProcessor.mock.calls[0]![0];
179+
const event = { exception: { values: [{}] } };
180+
eventProcessor(event);
181+
182+
expect(event.exception.values[0]).toEqual(
183+
expect.objectContaining({
184+
mechanism: expect.objectContaining({ type: 'auto.db.supabase.queue.producer' }),
185+
}),
186+
);
187+
188+
captureExceptionSpy.mockRestore();
189+
});
190+
159191
it('should not mutate original params for single send or batch send', async () => {
160192
const singleParams = { queue_name: 'test-queue', message: { foo: 'bar', nested: { value: 42 } } };
161193
const batchParams = { queue_name: 'test-queue', messages: [{ foo: 'bar' }, { baz: 'qux' }] };
@@ -308,6 +340,30 @@ describe('Supabase Queue Instrumentation', () => {
308340
expect(processSpanCall?.[0]?.name).toBe('process test-queue');
309341
});
310342

343+
it('should capture consumer errors with consumer mechanism type', async () => {
344+
const captureExceptionSpy = vi.spyOn(exports, 'captureException').mockImplementation(() => '');
345+
346+
await callRpc(mockRpcFunction, mockSupabaseClient, 'pop', { queue_name: 'test-queue' }, ERROR_RESPONSE);
347+
348+
expect(captureExceptionSpy).toHaveBeenCalledTimes(1);
349+
350+
const scopeCallback = captureExceptionSpy.mock.calls[0]![1] as (scope: any) => any;
351+
const mockScope = { addEventProcessor: vi.fn().mockReturnThis(), setContext: vi.fn().mockReturnThis() };
352+
scopeCallback(mockScope);
353+
354+
const eventProcessor = mockScope.addEventProcessor.mock.calls[0]![0];
355+
const event = { exception: { values: [{}] } };
356+
eventProcessor(event);
357+
358+
expect(event.exception.values[0]).toEqual(
359+
expect.objectContaining({
360+
mechanism: expect.objectContaining({ type: 'auto.db.supabase.queue.consumer' }),
361+
}),
362+
);
363+
364+
captureExceptionSpy.mockRestore();
365+
});
366+
311367
it('should set correct attributes on consumer span', async () => {
312368
const startSpanSpy = vi.spyOn(Tracing, 'startSpan');
313369

0 commit comments

Comments
 (0)