Skip to content

fix(worker): propagate process <job_name> span's metadata into processor function so child spans are correctly nested#4064

Open
zamotany wants to merge 1 commit into
taskforcesh:masterfrom
zamotany:master
Open

fix(worker): propagate process <job_name> span's metadata into processor function so child spans are correctly nested#4064
zamotany wants to merge 1 commit into
taskforcesh:masterfrom
zamotany:master

Conversation

@zamotany
Copy link
Copy Markdown

@zamotany zamotany commented Apr 18, 2026

Why

When setting up custom telemetry spans, the propagated context in job.opts.telemetry.metadata points to the add <job_name span as a parent instead of process <job_name>. This causes the relationship between spans to be incorrect in a sandboxed worker. For non-sandboxed workers, the context is propagated correctly thanks to OTEL, but there's no automatic propagation between worker threads/child process boundries.

Current behaviour:

Screenshot 2026-04-16 at 15 24 59

Notice my-span-foo which is created inside a worker's processor function, is a sibling of process diceroller instead of being it's child.

Expected behaviour:

Screenshot 2026-04-17 at 12 36 45

How

The fix is to use srcPropagationMetadata (2nd argument of a callback from trace method) and overwrite job.opts.telemetry.metadata right before processing the job. This way a propagation metadata of process <job_name> will be used instead of add <job_name>. Since job.opts should be immutable, I introduced a method to clone a job with the ability to overwrite the opts. For each attempt at processing the job, before it's handed off for execution by the sandboxed processor, it clones the job and assigns process span propagation metadata. This way, custom spans inside the processor are children of each respective attempt at processing.

Additional context

Here's the (sandbox) worker implementation for reference:

import * as Sentry from '@sentry/node'
import { context, propagation } from '@opentelemetry/api'

function rollTheDice(rolls, min, max) {
  // ...
}

export default function (job) {
  const parentContext = propagation.extract(context.active(), JSON.parse(job.opts.tm));

  return context.with(parentContext, () => {
    return Sentry.startSpan({ name: 'my-span-foo' }, () => {
      return rollTheDice(job.data.rolls, job.data.min, job.data.max)
    })
  })
}

Using Sentry with OTEL, but it will behave the same by using @opentelemetry/api as well.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes telemetry context propagation so spans created inside a worker’s processor function are parented under the process <job_name> span (instead of incorrectly inheriting from the add <job_name> span).

Changes:

  • Update Worker.processJob to request dstPropagationMetadata from trace() and overwrite job.opts.telemetry.metadata right before invoking the processor.
  • Adjust telemetry interface tests to validate the processor receives updated propagation metadata during job processing.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
src/classes/worker.ts Uses trace()’s destination propagation metadata to ensure processor-created spans nest under the process span.
tests/telemetry_interface.test.ts Adds an assertion intended to verify the processor sees updated telemetry metadata.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread tests/telemetry_interface.test.ts Outdated
Comment thread src/classes/worker.ts Outdated
@manast
Copy link
Copy Markdown
Contributor

manast commented Apr 20, 2026

Thanks for the PR. In theory the job.opts object should be immutable, so rewriting the telemetry metadata is breaking this contract. I wonder if there is a better way to do this, maybe we need to enhance the telemetry interface.

@zamotany
Copy link
Copy Markdown
Author

Two options I can think of are:

  1. Quicker but more boilerplate-y on the developer's side: somehow propagate the current context metadata, if not in job.opts.telemetry then in some new property on Job.
  2. A built-in solution that automatically sets the correct context even for sandboxed processors: before calling developer's provided process function, it would set up the context first. Then the correct context metadata can be just passed around as function arguments.

@zamotany zamotany force-pushed the master branch 2 times, most recently from fb794e5 to 160361f Compare April 22, 2026 21:22
@zamotany
Copy link
Copy Markdown
Author

@manast

I've tried to approach the solution differently to avoid breaking the immutability of job.opts, but at the end of the day, the best place to store process span's propagation context is inside job.opts, but only at the time of actual processing - for each attempt at processing, the process span will be different, so job.opts.tm stored inside Redis must point to add <job_name> span. So I decided to simply allow cloning the job with an ability to overwrite opts. Also, this context propagation issue is only for sandboxed processes.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.

Comments suppressed due to low confidence (1)

src/classes/worker.ts:1007

  • In processJob, the callback parameter is named srcPropagationMetadata, but the 2nd callback arg provided by trace() is actually destination propagation metadata (the metadata for the new process span context). This naming (and shadowing the outer srcPropagationMetadata const) makes it easy to confuse the two and accidentally propagate the wrong value. Renaming the inner parameter to something like dstPropagationMetadata would make the intent clearer.
      async (span, srcPropagationMetadata) => {
        span?.setAttributes({
          [TelemetryAttributes.WorkerId]: this.id,
          [TelemetryAttributes.WorkerName]: this.opts.name,
          [TelemetryAttributes.JobId]: job.id,
          [TelemetryAttributes.JobName]: job.name,
        });

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/classes/worker.ts
Comment on lines +446 to +457
return this.processFn(
job.cloneWithOpts({
telemetry: {
...job.opts.telemetry,
metadata: job.opts.telemetry.omitContext
? undefined
: srcPropagationMetadata,
},
}),
token,
signal,
);
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the sandboxed path, passing a cloned Job instance into processFn changes which Job object the sandbox bridge operates on (see sandbox.ts: progress/log/move commands call methods on the job instance passed into sandbox()). This can make progress events emit with the cloned job while completed/failed emit with the original job, and can desync in-memory fields between the two instances. Prefer keeping the original job instance for sandbox communication and only overriding the telemetry metadata in the serialized payload sent to the child.

Suggested change
return this.processFn(
job.cloneWithOpts({
telemetry: {
...job.opts.telemetry,
metadata: job.opts.telemetry.omitContext
? undefined
: srcPropagationMetadata,
},
}),
token,
signal,
);
const telemetry = job.opts.telemetry;
const originalMetadata = telemetry.metadata;
telemetry.metadata = telemetry.omitContext
? undefined
: srcPropagationMetadata;
try {
return this.processFn(job, token, signal);
} finally {
telemetry.metadata = originalMetadata;
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is actually a big problem, which makes me think that cloneWithOpts might be pointless - since I want to avoid cloning a job and then cloning that job back into original-ish job. I think it's way better to simply break the immutability of job.opts in this one case and re-assign the metadata back so it's all encapsulated in one place.

Comment thread src/classes/worker.ts
...job.opts.telemetry,
metadata: job.opts.telemetry.omitContext
? undefined
: srcPropagationMetadata,
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

callProcessJob overwrites telemetry.metadata with srcPropagationMetadata even when it is undefined/empty (e.g. when this.opts.telemetry is disabled, trace() calls the callback with no args, or when getMetadata() returns an empty string). In those cases this will clear any existing user-provided job.opts.telemetry.metadata for sandboxed workers. Guard the overwrite so it only happens when you actually have a new propagation metadata to apply (and otherwise preserve the existing metadata), while still honoring omitContext.

Suggested change
: srcPropagationMetadata,
: srcPropagationMetadata
? srcPropagationMetadata
: job.opts.telemetry.metadata,

Copilot uses AI. Check for mistakes.
Comment thread tests/job.test.ts
Comment on lines +1991 to +2024
describe('.cloneWithOpts', () => {
it('should clone one-off debounced and delayed job', async () => {
const sourceJob = await Job.create(
queue,
'test_job',
{ foo: 'bar' },
{
jobId: 'custom_job_id',
delay: 1000,
deduplication: { id: 'dedup-id' },
debounce: { id: 'debounce-id' },
backoff: { type: 'fixed', jitter: 0 },
telemetry: {
metadata: 'source_metadata',
},
},
);

const clonedJob = sourceJob.cloneWithOpts({
telemetry: {
metadata: 'target_metadata',
},
});

expect(clonedJob.asJSON()).toEqual({
...sourceJob.asJSON(),
opts: {
...sourceJob.asJSON().opts,
// timestamp for cloned job is passed as opts,
// whereas for sourceJob it's autogenerated hence the difference
timestamp: sourceJob.timestamp,
tm: clonedJob.asJSON().opts.tm,
},
});
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new .cloneWithOpts tests don't currently assert that the telemetry metadata actually changes from the source value to the target value (the expectation sets tm to clonedJob.asJSON().opts.tm, which will pass even if cloneWithOpts fails to apply the override). Add assertions that sourceJob.asJSON().opts.tm === 'source_metadata' and clonedJob.asJSON().opts.tm === 'target_metadata' (and/or that omitContext behaves as expected) so the test validates the intended behavior.

Copilot uses AI. Check for mistakes.
Comment on lines +426 to +452

const addSpan = startSpanSpy.returnValues[0] as MockSpan;
expect(addSpan).toBeInstanceOf(MockSpan);
expect(addSpan.name).toBe(`add ${queueName}.${job.name}`);
expect(addSpan.options?.kind).toBe(SpanKind.PRODUCER);
expect(addSpan.attributes[TelemetryAttributes.QueueName]).toBe(
queue.name,
);
expect(addSpan.attributes[TelemetryAttributes.QueueOperation]).toBe(
'add',
);
expect(addSpan.attributes[TelemetryAttributes.JobName]).toBe(job.name);
expect(addSpan.attributes[TelemetryAttributes.JobId]).toBe(job.id);

const processSpan = startSpanSpy.returnValues[1] as MockSpan;
expect(processSpan).toBeInstanceOf(MockSpan);
expect(processSpan.name).toBe(`process ${queueName}`);
expect(processSpan.options?.kind).toBe(SpanKind.CONSUMER);
expect(processSpan.attributes[TelemetryAttributes.WorkerId]).toBe(
worker.id,
);
expect(processSpan.attributes[TelemetryAttributes.WorkerName]).toBe(
'testWorker',
);
expect(processSpan.attributes[TelemetryAttributes.JobId]).toBe(job.id);

expect(processContextMetadata['getMetadata_span']).toBe(processSpan.name);
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test assumes startSpanSpy.returnValues[0] is always the add ... span and [1] is always the process ... span. Because the Worker autoruns on construction and calls getNextJob() (which is traced) concurrently with queue.add(), additional spans can be started before add/process, making these indices nondeterministic and the test flaky. Instead, locate spans by name/kind (e.g., find the first span whose name starts with add ${queueName}. and the one equal to process ${queueName}) or disable autorun and start the worker after the spy is attached.

Suggested change
const addSpan = startSpanSpy.returnValues[0] as MockSpan;
expect(addSpan).toBeInstanceOf(MockSpan);
expect(addSpan.name).toBe(`add ${queueName}.${job.name}`);
expect(addSpan.options?.kind).toBe(SpanKind.PRODUCER);
expect(addSpan.attributes[TelemetryAttributes.QueueName]).toBe(
queue.name,
);
expect(addSpan.attributes[TelemetryAttributes.QueueOperation]).toBe(
'add',
);
expect(addSpan.attributes[TelemetryAttributes.JobName]).toBe(job.name);
expect(addSpan.attributes[TelemetryAttributes.JobId]).toBe(job.id);
const processSpan = startSpanSpy.returnValues[1] as MockSpan;
expect(processSpan).toBeInstanceOf(MockSpan);
expect(processSpan.name).toBe(`process ${queueName}`);
expect(processSpan.options?.kind).toBe(SpanKind.CONSUMER);
expect(processSpan.attributes[TelemetryAttributes.WorkerId]).toBe(
worker.id,
);
expect(processSpan.attributes[TelemetryAttributes.WorkerName]).toBe(
'testWorker',
);
expect(processSpan.attributes[TelemetryAttributes.JobId]).toBe(job.id);
expect(processContextMetadata['getMetadata_span']).toBe(processSpan.name);
const spans = startSpanSpy.returnValues as MockSpan[];
const addSpan = spans.find(
span => span.name === `add ${queueName}.${job.name}`,
);
expect(addSpan).toBeInstanceOf(MockSpan);
expect(addSpan?.name).toBe(`add ${queueName}.${job.name}`);
expect(addSpan?.options?.kind).toBe(SpanKind.PRODUCER);
expect(addSpan?.attributes[TelemetryAttributes.QueueName]).toBe(
queue.name,
);
expect(addSpan?.attributes[TelemetryAttributes.QueueOperation]).toBe(
'add',
);
expect(addSpan?.attributes[TelemetryAttributes.JobName]).toBe(job.name);
expect(addSpan?.attributes[TelemetryAttributes.JobId]).toBe(job.id);
const processSpan = spans.find(
span => span.name === `process ${queueName}`,
);
expect(processSpan).toBeInstanceOf(MockSpan);
expect(processSpan?.name).toBe(`process ${queueName}`);
expect(processSpan?.options?.kind).toBe(SpanKind.CONSUMER);
expect(processSpan?.attributes[TelemetryAttributes.WorkerId]).toBe(
worker.id,
);
expect(processSpan?.attributes[TelemetryAttributes.WorkerName]).toBe(
'testWorker',
);
expect(processSpan?.attributes[TelemetryAttributes.JobId]).toBe(job.id);
expect(processContextMetadata['getMetadata_span']).toBe(processSpan?.name);

Copilot uses AI. Check for mistakes.
Comment thread src/classes/job.ts
Comment on lines +588 to +598
return new Job<DataType, ReturnType, NameType>(
this.queue,
this.name,
this.data,
{
...this.opts,
timestamp: this.timestamp,
...opts,
},
this.id,
);
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cloneWithOpts currently constructs a brand-new Job instance via new Job(...), which reinitializes runtime state (e.g. progress, attemptsMade, attemptsStarted, stalledCounter, processedOn, finishedOn, failedReason, stacktrace, etc.). Since this method is now used for sandbox processing, the JSON sent to the child (via asJSONSandbox()) can lose these fields, changing observable behavior inside processors (and potentially retry/attempt logic). Consider cloning by copying the existing instance fields and only replacing opts (or provide an API to generate asJSONSandbox() with overridden opts) so the cloned job preserves all current job state.

Suggested change
return new Job<DataType, ReturnType, NameType>(
this.queue,
this.name,
this.data,
{
...this.opts,
timestamp: this.timestamp,
...opts,
},
this.id,
);
const cloned = Object.assign(
Object.create(Object.getPrototypeOf(this)),
this,
) as Job<DataType, ReturnType, NameType>;
cloned.opts = {
...this.opts,
timestamp: this.timestamp,
...opts,
};
return cloned;

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants