Skip to content

Commit c4e7c3f

Browse files
committed
refactor(otel): use job.createdAt instead of carrier injection for queue time
1 parent 5d69498 commit c4e7c3f

File tree

5 files changed

+20
-30
lines changed

5 files changed

+20
-30
lines changed

src/job_batch_dispatcher.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,15 @@ export class JobBatchDispatcher<T> {
152152
const adapter = this.#getAdapterInstance()
153153
const wrapInternal = QueueManager.getInternalOperationWrapper()
154154

155+
const now = Date.now()
155156
const jobs = this.#payloads.map((payload) => ({
156157
id: randomUUID(),
157158
name: this.#name,
158159
payload,
159160
attempts: 0,
160161
priority: this.#priority,
161162
groupId: this.#groupId,
163+
createdAt: now,
162164
}))
163165

164166
const message: JobDispatchMessage = { jobs, queue: this.#queue }

src/job_dispatcher.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ export class JobDispatcher<T> {
196196
attempts: 0,
197197
priority: this.#priority,
198198
groupId: this.#groupId,
199+
createdAt: Date.now(),
199200
}
200201

201202
const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay }

src/otel.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,6 @@ export class QueueInstrumentation extends InstrumentationBase<QueueInstrumentati
183183
for (const job of message.jobs) {
184184
if (!job.traceContext) job.traceContext = {}
185185
propagation.inject(dispatchContext, job.traceContext)
186-
job.traceContext['boringqueue.dispatched_at'] = String(Date.now())
187186
}
188187

189188
this.dispatchSpans.set(message, span)
@@ -231,9 +230,8 @@ export class QueueInstrumentation extends InstrumentationBase<QueueInstrumentati
231230
baseContext
232231
)
233232

234-
const dispatchedAt = Number(job.traceContext?.['boringqueue.dispatched_at'])
235-
if (dispatchedAt) {
236-
span.setAttribute('messaging.job.queue_time_ms', Date.now() - dispatchedAt)
233+
if (job.createdAt) {
234+
span.setAttribute('messaging.job.queue_time_ms', Date.now() - job.createdAt)
237235
}
238236

239237
this.executeSpans.set(job.id, span)

src/types/main.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@ export interface JobData {
122122
*/
123123
groupId?: string
124124

125+
/**
126+
* Timestamp (ms) when the job was dispatched.
127+
* Used to compute queue wait time in OTel instrumentation.
128+
*/
129+
createdAt?: number
130+
125131
/**
126132
* Serialized trace context for distributed tracing.
127133
* Injected by OTel plugin at dispatch time.

tests/otel.spec.ts

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -151,33 +151,17 @@ test.group('QueueInstrumentation | dispatch via DC', (group) => {
151151
instrumentation.disable()
152152
})
153153

154-
test('injects boringqueue.dispatched_at into traceContext', async ({ assert }) => {
154+
test('does not inject non-standard keys into traceContext', async ({ assert }) => {
155155
const { instrumentation } = await setupWithWrappers()
156156

157157
const jobData: JobData = { id: 'ts-1', name: 'TimestampJob', payload: {}, attempts: 0 }
158158
const message: JobDispatchMessage = { jobs: [jobData], queue: 'default' }
159159
await dispatchChannel.tracePromise(async () => {}, message)
160160

161161
assert.isDefined(jobData.traceContext)
162-
assert.isTrue('boringqueue.dispatched_at' in jobData.traceContext!)
163-
const dispatchedAt = Number(jobData.traceContext!['boringqueue.dispatched_at'])
164-
assert.isAbove(dispatchedAt, 0)
165-
assert.closeTo(dispatchedAt, Date.now(), 1000)
166-
167-
instrumentation.disable()
168-
})
169-
170-
test('batch dispatch injects boringqueue.dispatched_at into every job', async ({ assert }) => {
171-
const { instrumentation } = await setupWithWrappers()
172-
173-
const jobs: JobData[] = [
174-
{ id: 'bts-1', name: 'BatchTsJob', payload: {}, attempts: 0 },
175-
{ id: 'bts-2', name: 'BatchTsJob', payload: {}, attempts: 0 },
176-
]
177-
await dispatchChannel.tracePromise(async () => {}, { jobs, queue: 'default' })
178-
179-
assert.isTrue('boringqueue.dispatched_at' in jobs[0].traceContext!)
180-
assert.isTrue('boringqueue.dispatched_at' in jobs[1].traceContext!)
162+
for (const key of Object.keys(jobData.traceContext!)) {
163+
assert.isFalse(key.startsWith('boringqueue.'), `traceContext should not contain "${key}"`)
164+
}
181165

182166
instrumentation.disable()
183167
})
@@ -294,14 +278,13 @@ test.group('QueueInstrumentation | execute via executionWrapper', (group) => {
294278
instrumentation.disable()
295279
})
296280

297-
test('sets queue_time_ms attribute when boringqueue.dispatched_at is present', async ({ assert }) => {
281+
test('sets queue_time_ms attribute when createdAt is present', async ({ assert }) => {
298282
const { instrumentation, executionWrapper } = await setupWithWrappers()
299283

300-
const dispatchedAt = Date.now() - 500
301284
const job = makeJob({
302285
id: 'qt-1',
303286
name: 'QueueTimeJob',
304-
traceContext: { 'boringqueue.dispatched_at': String(dispatchedAt) },
287+
createdAt: Date.now() - 500,
305288
})
306289
const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' }
307290

@@ -318,7 +301,7 @@ test.group('QueueInstrumentation | execute via executionWrapper', (group) => {
318301
instrumentation.disable()
319302
})
320303

321-
test('does not set queue_time_ms when boringqueue.dispatched_at is missing', async ({ assert }) => {
304+
test('does not set queue_time_ms when createdAt is missing', async ({ assert }) => {
322305
const { instrumentation, executionWrapper } = await setupWithWrappers()
323306

324307
const job = makeJob({ id: 'qt-2', name: 'NoQueueTimeJob' })
@@ -338,10 +321,10 @@ test.group('QueueInstrumentation | execute via executionWrapper', (group) => {
338321
test('queue_time_ms end-to-end via dispatch then execute', async ({ assert }) => {
339322
const { instrumentation, executionWrapper } = await setupWithWrappers()
340323

341-
const jobData: JobData = { id: 'e2e-qt-1', name: 'E2EQueueTimeJob', payload: {}, attempts: 0 }
324+
const jobData: JobData = { id: 'e2e-qt-1', name: 'E2EQueueTimeJob', payload: {}, attempts: 0, createdAt: Date.now() }
342325
await dispatchChannel.tracePromise(async () => {}, { jobs: [jobData], queue: 'default' })
343326

344-
const job = makeJob({ id: 'e2e-qt-1', name: 'E2EQueueTimeJob', traceContext: jobData.traceContext })
327+
const job = makeJob({ id: 'e2e-qt-1', name: 'E2EQueueTimeJob', createdAt: jobData.createdAt, traceContext: jobData.traceContext })
345328
const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' }
346329

347330
await executeChannel.tracePromise(async () => {

0 commit comments

Comments
 (0)