Skip to content

Commit 9063d43

Browse files
committed
Clean up
1 parent c2237fa commit 9063d43

File tree

8 files changed

+22
-29
lines changed

8 files changed

+22
-29
lines changed

dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-error.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Enqueue a job to the queue
1+
// Consume from a non-existing queue to test error handling
22

33
import { NextApiRequest, NextApiResponse } from 'next';
44
import { createClient } from '@supabase/supabase-js';
@@ -14,7 +14,7 @@ const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_R
1414
Sentry.instrumentSupabaseClient(supabaseClient);
1515

1616
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
17-
// Enqueue a job to the queue
17+
// Pop from a non-existing queue to trigger an error
1818
const { data, error } = await supabaseClient.schema('pgmq_public').rpc('pop', {
1919
queue_name: 'non-existing-queue',
2020
});

dev-packages/e2e-tests/test-applications/supabase-nextjs/pages/api/queue/consumer-rpc.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Enqueue a job to the queue
1+
// Consume a message from the queue using direct rpc() call
22

33
import { NextApiRequest, NextApiResponse } from 'next';
44
import { createClient } from '@supabase/supabase-js';
@@ -18,7 +18,7 @@ const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_R
1818
Sentry.instrumentSupabaseClient(supabaseClient);
1919

2020
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
21-
// Enqueue a job to the queue
21+
// Pop a message from the queue
2222
const { data, error } = await supabaseClient.rpc('pop', {
2323
queue_name: 'todos',
2424
});

dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ test('Sends server-side Supabase RPC spans and breadcrumbs', async ({ page, base
3939
const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => {
4040
return Boolean(
4141
transactionEvent?.contexts?.trace?.op === 'http.server' &&
42-
transactionEvent?.transaction === 'GET /api/rpc/status',
42+
transactionEvent?.transaction === 'GET /api/rpc/status',
4343
);
4444
});
4545

@@ -352,8 +352,8 @@ test('Sends queue process spans with `schema(...).rpc(...)`', async ({ page, bas
352352
const producerTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => {
353353
return Boolean(
354354
transactionEvent?.contexts?.trace?.op === 'http.server' &&
355-
transactionEvent?.transaction === 'GET /api/queue/producer-schema' &&
356-
transactionEvent?.spans?.some((span: any) => span.op === 'queue.publish'),
355+
transactionEvent?.transaction === 'GET /api/queue/producer-schema' &&
356+
transactionEvent?.spans?.some((span: any) => span.op === 'queue.publish'),
357357
);
358358
});
359359

@@ -369,8 +369,8 @@ test('Sends queue process spans with `schema(...).rpc(...)`', async ({ page, bas
369369
const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => {
370370
return Boolean(
371371
transactionEvent?.contexts?.trace?.op === 'http.server' &&
372-
transactionEvent?.transaction === 'GET /api/queue/consumer-schema' &&
373-
transactionEvent?.spans?.some((span: any) => span.op === 'queue.process'),
372+
transactionEvent?.transaction === 'GET /api/queue/consumer-schema' &&
373+
transactionEvent?.spans?.some((span: any) => span.op === 'queue.process'),
374374
);
375375
});
376376

@@ -575,7 +575,7 @@ test('Sends queue process error spans with `rpc(...)`', async ({ page, baseURL }
575575
const httpTransactionPromise = waitForTransaction('supabase-nextjs', transactionEvent => {
576576
return Boolean(
577577
transactionEvent?.contexts?.trace?.op === 'http.server' &&
578-
transactionEvent?.transaction === 'GET /api/queue/consumer-error',
578+
transactionEvent?.transaction === 'GET /api/queue/consumer-error',
579579
);
580580
});
581581

packages/core/src/integrations/supabase/constants.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ export const FILTER_MAPPINGS = {
5252

5353
export const DB_OPERATIONS_TO_INSTRUMENT = ['select', 'insert', 'upsert', 'update', 'delete'];
5454

55-
export const QUEUE_RPC_OPERATIONS = new Set(['send', 'send_batch', 'pop', 'receive', 'read']);
56-
5755
export const INTEGRATION_NAME = 'Supabase';
5856

5957
/**

packages/core/src/integrations/supabase/queue-consumer.ts

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,7 @@ function _processConsumerSpan(span: Span, res: SupabaseResponse, queueName: stri
4545
span.setAttribute('messaging.message.retry.count', 0);
4646
span.setStatus({ code: res.error ? SPAN_STATUS_ERROR : SPAN_STATUS_OK });
4747

48-
const breadcrumbData: Record<string, unknown> = {};
49-
if (queueName) {
50-
breadcrumbData['messaging.destination.name'] = queueName;
51-
}
52-
_createQueueBreadcrumb('queue.process', queueName, Object.keys(breadcrumbData).length ? breadcrumbData : undefined);
48+
_createQueueBreadcrumb('queue.process', queueName, queueName ? { 'messaging.destination.name': queueName } : undefined);
5349

5450
if (res.error) {
5551
_captureQueueError(res.error, queueName);
@@ -136,9 +132,7 @@ export function _instrumentRpcConsumer(target: unknown, thisArg: unknown, argume
136132
queueName,
137133
});
138134

139-
const spanName = `process ${queueName || 'unknown'}`;
140-
// Cloudflare pattern: op='db.queue' for valid transactions, 'queue.process' for Queue Insights.
141-
// Works both as child spans and root spans.
135+
const spanName = `process ${queueName}`;
142136
const spanAttributes = {
143137
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase.queue.consumer',
144138
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: 'queue.process',
@@ -188,6 +182,8 @@ export function _instrumentRpcConsumer(target: unknown, thisArg: unknown, argume
188182
const sentryTrace = firstMessage?._sentry?.sentry_trace;
189183

190184
// Clean up _sentry metadata from messages before returning to user
185+
// Shallow copy to avoid mutating the Supabase response object
186+
let cleanedRes = res;
191187
if (Array.isArray(res.data)) {
192188
const hasMetadata = res.data.some(
193189
item =>
@@ -199,14 +195,15 @@ export function _instrumentRpcConsumer(target: unknown, thisArg: unknown, argume
199195
);
200196

201197
if (hasMetadata) {
202-
res.data = res.data.map(item => {
198+
const cleanedData = res.data.map(item => {
203199
if (item && typeof item === 'object' && item.message && typeof item.message === 'object') {
204200
const messageCopy = { ...(item.message as Record<string, unknown>) };
205201
delete messageCopy._sentry;
206202
return { ...item, message: messageCopy };
207203
}
208204
return item;
209205
});
206+
cleanedRes = { ...res, data: cleanedData };
210207
}
211208
}
212209

@@ -227,7 +224,7 @@ export function _instrumentRpcConsumer(target: unknown, thisArg: unknown, argume
227224
}
228225

229226
try {
230-
const processedResponse = _processConsumerSpan(span, res, queueName);
227+
const processedResponse = _processConsumerSpan(span, cleanedRes, queueName);
231228
DEBUG_BUILD && debug.log('Consumer span processed successfully', { queueName });
232229
return processedResponse;
233230
} catch (err: unknown) {
@@ -236,7 +233,7 @@ export function _instrumentRpcConsumer(target: unknown, thisArg: unknown, argume
236233
captureSupabaseError(err, 'auto.db.supabase.queue', { queueName });
237234

238235
span.setStatus({ code: SPAN_STATUS_ERROR });
239-
return res;
236+
return cleanedRes;
240237
}
241238
},
242239
(err: unknown) => {

packages/core/src/integrations/supabase/queue-producer.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,9 @@ export function _instrumentRpcProducer(target: unknown, thisArg: unknown, argume
6060

6161
const messageBodySize = _calculateMessageBodySize(queueParams?.message || queueParams?.messages);
6262

63-
// Cloudflare pattern: op='db.queue' for valid transactions, 'queue.publish' for Queue Insights.
64-
// Works both as child spans and root spans.
6563
return startSpan(
6664
{
67-
name: `publish ${queueName || 'unknown'}`,
65+
name: `publish ${queueName}`,
6866
op: 'db.queue',
6967
attributes: {
7068
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase.queue.producer',

packages/core/src/integrations/supabase/queue-utils.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ export function _createQueueBreadcrumb(
6161
addBreadcrumb(breadcrumb);
6262
}
6363

64-
/** Calculates the size of a message body in bytes, or undefined if too large or not serializable. */
64+
/** Calculates the approximate size of a message body (character count), or undefined if too large or not serializable. */
6565
export function _calculateMessageBodySize(message: unknown): number | undefined {
6666
if (!message) {
6767
return undefined;
@@ -95,7 +95,7 @@ export function _captureQueueError(
9595
captureSupabaseError(err, 'auto.db.supabase.queue', { queueName, messageId, ...extraContext });
9696
}
9797

98-
/** Parses an enqueued_at timestamp and returns the latency in milliseconds. */
98+
/** Parses an enqueued_at timestamp and returns the latency in milliseconds. Assumes Supabase/PGMQ timestamptz (ISO 8601 with timezone). */
9999
export function _parseEnqueuedAtLatency(enqueuedAt: string | undefined): number | undefined {
100100
if (!enqueuedAt) {
101101
return undefined;

packages/core/test/lib/integrations/supabase-queues.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -711,7 +711,7 @@ describe('Supabase Queue Instrumentation', () => {
711711
});
712712

713713
describe('Non-Queue RPC Operations', () => {
714-
it('should not instrument non-queue RPC calls', async () => {
714+
it('should not instrument non-queue RPC calls as queue operations', async () => {
715715
const mockResponse = { data: { result: 'success' } };
716716
mockRpcFunction.mockResolvedValue(mockResponse);
717717
instrumentSupabaseClient(mockSupabaseClient);

0 commit comments

Comments
 (0)