Skip to content

Commit 83db97d

Browse files
committed
feat(otel): add queue_time_ms span attribute
1 parent cbb2aa4 commit 83db97d

File tree

3 files changed

+101
-0
lines changed

3 files changed

+101
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,7 @@ The instrumentation uses standard [OTel messaging semantic conventions](https://
560560
| `messaging.job.group_id` | Custom | Queue-specific group identifier |
561561
| `messaging.job.priority` | Custom | Queue-specific job priority |
562562
| `messaging.job.delay_ms` | Custom | Delay before the job becomes available |
563+
| `messaging.job.queue_time_ms` | Custom | Time spent waiting in queue before processing |
563564

564565
### Trace Context Propagation
565566

src/otel.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ export class QueueInstrumentation extends InstrumentationBase<QueueInstrumentati
183183
for (const job of message.jobs) {
184184
if (!job.traceContext) job.traceContext = {}
185185
propagation.inject(dispatchContext, job.traceContext)
186+
job.traceContext['boringqueue.dispatched_at'] = String(Date.now())
186187
}
187188

188189
this.dispatchSpans.set(message, span)
@@ -230,6 +231,11 @@ export class QueueInstrumentation extends InstrumentationBase<QueueInstrumentati
230231
baseContext
231232
)
232233

234+
const dispatchedAt = Number(job.traceContext?.['boringqueue.dispatched_at'])
235+
if (dispatchedAt) {
236+
span.setAttribute('messaging.job.queue_time_ms', Date.now() - dispatchedAt)
237+
}
238+
233239
this.executeSpans.set(job.id, span)
234240
const executionContext = trace.setSpan(baseContext, span)
235241

tests/otel.spec.ts

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,37 @@ test.group('QueueInstrumentation | dispatch via DC', (group) => {
151151
instrumentation.disable()
152152
})
153153

154+
test('injects boringqueue.dispatched_at into traceContext', async ({ assert }) => {
155+
const { instrumentation } = await setupWithWrappers()
156+
157+
const jobData: JobData = { id: 'ts-1', name: 'TimestampJob', payload: {}, attempts: 0 }
158+
const message: JobDispatchMessage = { jobs: [jobData], queue: 'default' }
159+
await dispatchChannel.tracePromise(async () => {}, message)
160+
161+
assert.isDefined(jobData.traceContext)
162+
assert.isTrue('boringqueue.dispatched_at' in jobData.traceContext!)
163+
const dispatchedAt = Number(jobData.traceContext!['boringqueue.dispatched_at'])
164+
assert.isAbove(dispatchedAt, 0)
165+
assert.closeTo(dispatchedAt, Date.now(), 1000)
166+
167+
instrumentation.disable()
168+
})
169+
170+
test('batch dispatch injects boringqueue.dispatched_at into every job', async ({ assert }) => {
171+
const { instrumentation } = await setupWithWrappers()
172+
173+
const jobs: JobData[] = [
174+
{ id: 'bts-1', name: 'BatchTsJob', payload: {}, attempts: 0 },
175+
{ id: 'bts-2', name: 'BatchTsJob', payload: {}, attempts: 0 },
176+
]
177+
await dispatchChannel.tracePromise(async () => {}, { jobs, queue: 'default' })
178+
179+
assert.isTrue('boringqueue.dispatched_at' in jobs[0].traceContext!)
180+
assert.isTrue('boringqueue.dispatched_at' in jobs[1].traceContext!)
181+
182+
instrumentation.disable()
183+
})
184+
154185
test('batch dispatch injects trace context into every job', async ({ assert }) => {
155186
const { instrumentation } = await setupWithWrappers()
156187

@@ -263,6 +294,69 @@ test.group('QueueInstrumentation | execute via executionWrapper', (group) => {
263294
instrumentation.disable()
264295
})
265296

297+
test('sets queue_time_ms attribute when boringqueue.dispatched_at is present', async ({ assert }) => {
298+
const { instrumentation, executionWrapper } = await setupWithWrappers()
299+
300+
const dispatchedAt = Date.now() - 500
301+
const job = makeJob({
302+
id: 'qt-1',
303+
name: 'QueueTimeJob',
304+
traceContext: { 'boringqueue.dispatched_at': String(dispatchedAt) },
305+
})
306+
const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' }
307+
308+
await executeChannel.tracePromise(async () => {
309+
await executionWrapper(async () => {}, job, 'default')
310+
}, message)
311+
312+
const spans = getFinishedSpans()
313+
const span = spans.find((s) => s.kind === SpanKind.CONSUMER)
314+
const queueTime = span!.attributes['messaging.job.queue_time_ms'] as number
315+
assert.isDefined(queueTime)
316+
assert.isAtLeast(queueTime, 400)
317+
318+
instrumentation.disable()
319+
})
320+
321+
test('does not set queue_time_ms when boringqueue.dispatched_at is missing', async ({ assert }) => {
322+
const { instrumentation, executionWrapper } = await setupWithWrappers()
323+
324+
const job = makeJob({ id: 'qt-2', name: 'NoQueueTimeJob' })
325+
const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' }
326+
327+
await executeChannel.tracePromise(async () => {
328+
await executionWrapper(async () => {}, job, 'default')
329+
}, message)
330+
331+
const spans = getFinishedSpans()
332+
const span = spans.find((s) => s.kind === SpanKind.CONSUMER)
333+
assert.notProperty(span!.attributes, 'messaging.job.queue_time_ms')
334+
335+
instrumentation.disable()
336+
})
337+
338+
test('queue_time_ms end-to-end via dispatch then execute', async ({ assert }) => {
339+
const { instrumentation, executionWrapper } = await setupWithWrappers()
340+
341+
const jobData: JobData = { id: 'e2e-qt-1', name: 'E2EQueueTimeJob', payload: {}, attempts: 0 }
342+
await dispatchChannel.tracePromise(async () => {}, { jobs: [jobData], queue: 'default' })
343+
344+
const job = makeJob({ id: 'e2e-qt-1', name: 'E2EQueueTimeJob', traceContext: jobData.traceContext })
345+
const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' }
346+
347+
await executeChannel.tracePromise(async () => {
348+
await executionWrapper(async () => {}, job, 'default')
349+
}, message)
350+
351+
const spans = getFinishedSpans()
352+
const span = spans.find((s) => s.kind === SpanKind.CONSUMER)
353+
const queueTime = span!.attributes['messaging.job.queue_time_ms'] as number
354+
assert.isDefined(queueTime)
355+
assert.isAtLeast(queueTime, 0)
356+
357+
instrumentation.disable()
358+
})
359+
266360
test('child spans are parented to consumer span', async ({ assert }) => {
267361
const { instrumentation, executionWrapper } = await setupWithWrappers()
268362

0 commit comments

Comments
 (0)