fix(worker): propagate process <job_name> span's metadata into processor function so child spans are correctly nested#4064
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes telemetry context propagation so spans created inside a worker’s processor function are parented under the process <job_name> span (instead of incorrectly inheriting from the add <job_name> span).
Changes:
- Update
Worker.processJobto requestdstPropagationMetadatafromtrace()and overwritejob.opts.telemetry.metadataright before invoking the processor. - Adjust telemetry interface tests to validate the processor receives updated propagation metadata during job processing.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
src/classes/worker.ts |
Uses trace()’s destination propagation metadata to ensure processor-created spans nest under the process span. |
tests/telemetry_interface.test.ts |
Adds an assertion intended to verify the processor sees updated telemetry metadata. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Thanks for the PR. In theory the job.opts object should be immutable, so rewriting the telemetry metadata is breaking this contract. I wonder if there is a better way to do this, maybe we need to enhance the telemetry interface. |
|
Two options I can think of are:
|
fb794e5 to
160361f
Compare
|
I've tried to approach the solution differently to avoid breaking the immutability of |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
src/classes/worker.ts:1007
- In
processJob, the callback parameter is namedsrcPropagationMetadata, but the 2nd callback arg provided bytrace()is actually destination propagation metadata (the metadata for the newprocessspan context). This naming (and shadowing the outersrcPropagationMetadataconst) makes it easy to confuse the two and accidentally propagate the wrong value. Renaming the inner parameter to something likedstPropagationMetadatawould make the intent clearer.
async (span, srcPropagationMetadata) => {
span?.setAttributes({
[TelemetryAttributes.WorkerId]: this.id,
[TelemetryAttributes.WorkerName]: this.opts.name,
[TelemetryAttributes.JobId]: job.id,
[TelemetryAttributes.JobName]: job.name,
});
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return this.processFn( | ||
| job.cloneWithOpts({ | ||
| telemetry: { | ||
| ...job.opts.telemetry, | ||
| metadata: job.opts.telemetry.omitContext | ||
| ? undefined | ||
| : srcPropagationMetadata, | ||
| }, | ||
| }), | ||
| token, | ||
| signal, | ||
| ); |
There was a problem hiding this comment.
In the sandboxed path, passing a cloned Job instance into processFn changes which Job object the sandbox bridge operates on (see sandbox.ts: progress/log/move commands call methods on the job instance passed into sandbox()). This can make progress events emit with the cloned job while completed/failed emit with the original job, and can desync in-memory fields between the two instances. Prefer keeping the original job instance for sandbox communication and only overriding the telemetry metadata in the serialized payload sent to the child.
| return this.processFn( | |
| job.cloneWithOpts({ | |
| telemetry: { | |
| ...job.opts.telemetry, | |
| metadata: job.opts.telemetry.omitContext | |
| ? undefined | |
| : srcPropagationMetadata, | |
| }, | |
| }), | |
| token, | |
| signal, | |
| ); | |
| const telemetry = job.opts.telemetry; | |
| const originalMetadata = telemetry.metadata; | |
| telemetry.metadata = telemetry.omitContext | |
| ? undefined | |
| : srcPropagationMetadata; | |
| try { | |
| return this.processFn(job, token, signal); | |
| } finally { | |
| telemetry.metadata = originalMetadata; | |
| } |
There was a problem hiding this comment.
This one is actually a big problem, which makes me think that cloneWithOpts might be pointless - since I want to avoid cloning a job and then cloning that job back into original-ish job. I think it's way better to simply break the immutability of job.opts in this one case and re-assign the metadata back so it's all encapsulated in one place.
| ...job.opts.telemetry, | ||
| metadata: job.opts.telemetry.omitContext | ||
| ? undefined | ||
| : srcPropagationMetadata, |
There was a problem hiding this comment.
callProcessJob overwrites telemetry.metadata with srcPropagationMetadata even when it is undefined/empty (e.g. when this.opts.telemetry is disabled, trace() calls the callback with no args, or when getMetadata() returns an empty string). In those cases this will clear any existing user-provided job.opts.telemetry.metadata for sandboxed workers. Guard the overwrite so it only happens when you actually have a new propagation metadata to apply (and otherwise preserve the existing metadata), while still honoring omitContext.
| : srcPropagationMetadata, | |
| : srcPropagationMetadata | |
| ? srcPropagationMetadata | |
| : job.opts.telemetry.metadata, |
| describe('.cloneWithOpts', () => { | ||
| it('should clone one-off debounced and delayed job', async () => { | ||
| const sourceJob = await Job.create( | ||
| queue, | ||
| 'test_job', | ||
| { foo: 'bar' }, | ||
| { | ||
| jobId: 'custom_job_id', | ||
| delay: 1000, | ||
| deduplication: { id: 'dedup-id' }, | ||
| debounce: { id: 'debounce-id' }, | ||
| backoff: { type: 'fixed', jitter: 0 }, | ||
| telemetry: { | ||
| metadata: 'source_metadata', | ||
| }, | ||
| }, | ||
| ); | ||
|
|
||
| const clonedJob = sourceJob.cloneWithOpts({ | ||
| telemetry: { | ||
| metadata: 'target_metadata', | ||
| }, | ||
| }); | ||
|
|
||
| expect(clonedJob.asJSON()).toEqual({ | ||
| ...sourceJob.asJSON(), | ||
| opts: { | ||
| ...sourceJob.asJSON().opts, | ||
| // timestamp for cloned job is passed as opts, | ||
| // whereas for sourceJob it's autogenerated hence the difference | ||
| timestamp: sourceJob.timestamp, | ||
| tm: clonedJob.asJSON().opts.tm, | ||
| }, | ||
| }); |
There was a problem hiding this comment.
These new .cloneWithOpts tests don't currently assert that the telemetry metadata actually changes from the source value to the target value (the expectation sets tm to clonedJob.asJSON().opts.tm, which will pass even if cloneWithOpts fails to apply the override). Add assertions that sourceJob.asJSON().opts.tm === 'source_metadata' and clonedJob.asJSON().opts.tm === 'target_metadata' (and/or that omitContext behaves as expected) so the test validates the intended behavior.
|
|
||
| const addSpan = startSpanSpy.returnValues[0] as MockSpan; | ||
| expect(addSpan).toBeInstanceOf(MockSpan); | ||
| expect(addSpan.name).toBe(`add ${queueName}.${job.name}`); | ||
| expect(addSpan.options?.kind).toBe(SpanKind.PRODUCER); | ||
| expect(addSpan.attributes[TelemetryAttributes.QueueName]).toBe( | ||
| queue.name, | ||
| ); | ||
| expect(addSpan.attributes[TelemetryAttributes.QueueOperation]).toBe( | ||
| 'add', | ||
| ); | ||
| expect(addSpan.attributes[TelemetryAttributes.JobName]).toBe(job.name); | ||
| expect(addSpan.attributes[TelemetryAttributes.JobId]).toBe(job.id); | ||
|
|
||
| const processSpan = startSpanSpy.returnValues[1] as MockSpan; | ||
| expect(processSpan).toBeInstanceOf(MockSpan); | ||
| expect(processSpan.name).toBe(`process ${queueName}`); | ||
| expect(processSpan.options?.kind).toBe(SpanKind.CONSUMER); | ||
| expect(processSpan.attributes[TelemetryAttributes.WorkerId]).toBe( | ||
| worker.id, | ||
| ); | ||
| expect(processSpan.attributes[TelemetryAttributes.WorkerName]).toBe( | ||
| 'testWorker', | ||
| ); | ||
| expect(processSpan.attributes[TelemetryAttributes.JobId]).toBe(job.id); | ||
|
|
||
| expect(processContextMetadata['getMetadata_span']).toBe(processSpan.name); |
There was a problem hiding this comment.
This test assumes startSpanSpy.returnValues[0] is always the add ... span and [1] is always the process ... span. Because the Worker autoruns on construction and calls getNextJob() (which is traced) concurrently with queue.add(), additional spans can be started before add/process, making these indices nondeterministic and the test flaky. Instead, locate spans by name/kind (e.g., find the first span whose name starts with add ${queueName}. and the one equal to process ${queueName}) or disable autorun and start the worker after the spy is attached.
| const addSpan = startSpanSpy.returnValues[0] as MockSpan; | |
| expect(addSpan).toBeInstanceOf(MockSpan); | |
| expect(addSpan.name).toBe(`add ${queueName}.${job.name}`); | |
| expect(addSpan.options?.kind).toBe(SpanKind.PRODUCER); | |
| expect(addSpan.attributes[TelemetryAttributes.QueueName]).toBe( | |
| queue.name, | |
| ); | |
| expect(addSpan.attributes[TelemetryAttributes.QueueOperation]).toBe( | |
| 'add', | |
| ); | |
| expect(addSpan.attributes[TelemetryAttributes.JobName]).toBe(job.name); | |
| expect(addSpan.attributes[TelemetryAttributes.JobId]).toBe(job.id); | |
| const processSpan = startSpanSpy.returnValues[1] as MockSpan; | |
| expect(processSpan).toBeInstanceOf(MockSpan); | |
| expect(processSpan.name).toBe(`process ${queueName}`); | |
| expect(processSpan.options?.kind).toBe(SpanKind.CONSUMER); | |
| expect(processSpan.attributes[TelemetryAttributes.WorkerId]).toBe( | |
| worker.id, | |
| ); | |
| expect(processSpan.attributes[TelemetryAttributes.WorkerName]).toBe( | |
| 'testWorker', | |
| ); | |
| expect(processSpan.attributes[TelemetryAttributes.JobId]).toBe(job.id); | |
| expect(processContextMetadata['getMetadata_span']).toBe(processSpan.name); | |
| const spans = startSpanSpy.returnValues as MockSpan[]; | |
| const addSpan = spans.find( | |
| span => span.name === `add ${queueName}.${job.name}`, | |
| ); | |
| expect(addSpan).toBeInstanceOf(MockSpan); | |
| expect(addSpan?.name).toBe(`add ${queueName}.${job.name}`); | |
| expect(addSpan?.options?.kind).toBe(SpanKind.PRODUCER); | |
| expect(addSpan?.attributes[TelemetryAttributes.QueueName]).toBe( | |
| queue.name, | |
| ); | |
| expect(addSpan?.attributes[TelemetryAttributes.QueueOperation]).toBe( | |
| 'add', | |
| ); | |
| expect(addSpan?.attributes[TelemetryAttributes.JobName]).toBe(job.name); | |
| expect(addSpan?.attributes[TelemetryAttributes.JobId]).toBe(job.id); | |
| const processSpan = spans.find( | |
| span => span.name === `process ${queueName}`, | |
| ); | |
| expect(processSpan).toBeInstanceOf(MockSpan); | |
| expect(processSpan?.name).toBe(`process ${queueName}`); | |
| expect(processSpan?.options?.kind).toBe(SpanKind.CONSUMER); | |
| expect(processSpan?.attributes[TelemetryAttributes.WorkerId]).toBe( | |
| worker.id, | |
| ); | |
| expect(processSpan?.attributes[TelemetryAttributes.WorkerName]).toBe( | |
| 'testWorker', | |
| ); | |
| expect(processSpan?.attributes[TelemetryAttributes.JobId]).toBe(job.id); | |
| expect(processContextMetadata['getMetadata_span']).toBe(processSpan?.name); |
| return new Job<DataType, ReturnType, NameType>( | ||
| this.queue, | ||
| this.name, | ||
| this.data, | ||
| { | ||
| ...this.opts, | ||
| timestamp: this.timestamp, | ||
| ...opts, | ||
| }, | ||
| this.id, | ||
| ); |
There was a problem hiding this comment.
cloneWithOpts currently constructs a brand-new Job instance via new Job(...), which reinitializes runtime state (e.g. progress, attemptsMade, attemptsStarted, stalledCounter, processedOn, finishedOn, failedReason, stacktrace, etc.). Since this method is now used for sandbox processing, the JSON sent to the child (via asJSONSandbox()) can lose these fields, changing observable behavior inside processors (and potentially retry/attempt logic). Consider cloning by copying the existing instance fields and only replacing opts (or provide an API to generate asJSONSandbox() with overridden opts) so the cloned job preserves all current job state.
| return new Job<DataType, ReturnType, NameType>( | |
| this.queue, | |
| this.name, | |
| this.data, | |
| { | |
| ...this.opts, | |
| timestamp: this.timestamp, | |
| ...opts, | |
| }, | |
| this.id, | |
| ); | |
| const cloned = Object.assign( | |
| Object.create(Object.getPrototypeOf(this)), | |
| this, | |
| ) as Job<DataType, ReturnType, NameType>; | |
| cloned.opts = { | |
| ...this.opts, | |
| timestamp: this.timestamp, | |
| ...opts, | |
| }; | |
| return cloned; |
Why
When setting up custom telemetry spans, the propagated context in
job.opts.telemetry.metadatapoints to theadd <job_namespan as a parent instead ofprocess <job_name>. This causes the relationship between spans to be incorrect in a sandboxed worker. For non-sandboxed workers, the context is propagated correctly thanks to OTEL, but there's no automatic propagation between worker threads/child process boundries.Current behaviour:
Notice
my-span-foowhich is created inside a worker's processor function, is a sibling ofprocess dicerollerinstead of being it's child.Expected behaviour:
How
The fix is to use
srcPropagationMetadata(2nd argument of acallbackfromtracemethod) and overwritejob.opts.telemetry.metadataright before processing the job. This way a propagation metadata ofprocess <job_name>will be used instead ofadd <job_name>. Sincejob.optsshould be immutable, I introduced a method to clone a job with the ability to overwrite theopts. For each attempt at processing the job, before it's handed off for execution by the sandboxed processor, it clones the job and assigns process span propagation metadata. This way, custom spans inside the processor are children of each respective attempt at processing.Additional context
Here's the (sandbox) worker implementation for reference:
Using Sentry with OTEL, but it will behave the same by using
@opentelemetry/apias well.