Skip to content

Commit d97ffde

Browse files
torosentCopilot
andcommitted
feat: add retroactive span emission model for activity/sub-orch completion
Implement the retroactive span emission pattern matching the .NET SDK's EmitTraceActivityForTaskCompleted/Failed and EmitTraceActivityForTimer: - emitRetroactiveActivityClientSpan(): Creates Client spans at activity completion/failure time with historical startTime from TaskScheduled event - emitRetroactiveSubOrchClientSpan(): Same for sub-orchestration completions - emitSpanForTimer(): Now accepts optional startTime parameter for creation- to-fired duration coverage - processNewEventsForTracing(): Pre-processes new history events (before orchestrator executor runs) to emit retroactive spans, matching .NET's worker-level tracing pattern This addresses the architectural gap where JS emitted scheduling spans only at scheduling time (proactive), while .NET and Java emit retroactive spans at completion time with accurate scheduling-to-completion duration. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 75668d6 commit d97ffde

4 files changed

Lines changed: 600 additions & 0 deletions

File tree

packages/durabletask-js/src/tracing/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,7 @@ export {
3838
setOrchestrationStatusFromActions,
3939
createOrchestrationTraceContextPb,
4040
processActionsForTracing,
41+
emitRetroactiveActivityClientSpan,
42+
emitRetroactiveSubOrchClientSpan,
43+
processNewEventsForTracing,
4144
} from "./trace-helper";

packages/durabletask-js/src/tracing/trace-helper.ts

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ export function emitSpanForTimer(
321321
fireAt: Date,
322322
timerId: number,
323323
instanceId?: string,
324+
startTime?: Date,
324325
): void {
325326
const ctx = getTracingContext();
326327
if (!ctx) return;
@@ -332,6 +333,7 @@ export function emitSpanForTimer(
332333
spanName,
333334
{
334335
kind: ctx.otel.SpanKind.INTERNAL,
336+
startTime: startTime,
335337
attributes: {
336338
[DurableTaskAttributes.TYPE]: TaskType.TIMER,
337339
[DurableTaskAttributes.TASK_NAME]: orchestrationName,
@@ -346,6 +348,221 @@ export function emitSpanForTimer(
346348
span.end();
347349
}
348350

351+
/**
352+
* Emits a retroactive Client-kind span for a completed/failed activity task.
353+
* This matches the .NET SDK pattern (EmitTraceActivityForTaskCompleted/Failed) where
354+
* client spans are emitted at completion time with startTime from the original
355+
* TaskScheduled event timestamp, providing accurate scheduling-to-completion duration.
356+
*
357+
* @param orchestrationSpan - The parent orchestration span.
358+
* @param taskName - The activity name.
359+
* @param version - The activity version (optional).
360+
* @param instanceId - The orchestration instance ID.
361+
* @param taskId - The task's sequential ID.
362+
* @param startTime - The scheduling timestamp from the TaskScheduled history event.
363+
* @param failureMessage - If the task failed, the error message.
364+
*/
365+
export function emitRetroactiveActivityClientSpan(
366+
orchestrationSpan: Span,
367+
taskName: string,
368+
version: string | undefined,
369+
instanceId: string,
370+
taskId: number,
371+
startTime?: Date,
372+
failureMessage?: string,
373+
): void {
374+
const ctx = getTracingContext();
375+
if (!ctx) return;
376+
377+
const spanName = createSpanName(TaskType.ACTIVITY, taskName, version);
378+
const parentContext = ctx.otel.trace.setSpan(ctx.otel.context.active(), orchestrationSpan);
379+
380+
const span = ctx.tracer.startSpan(
381+
spanName,
382+
{
383+
kind: ctx.otel.SpanKind.CLIENT,
384+
startTime: startTime,
385+
attributes: {
386+
[DurableTaskAttributes.TYPE]: TaskType.ACTIVITY,
387+
[DurableTaskAttributes.TASK_NAME]: taskName,
388+
[DurableTaskAttributes.TASK_INSTANCE_ID]: instanceId,
389+
[DurableTaskAttributes.TASK_TASK_ID]: taskId,
390+
...(version ? { [DurableTaskAttributes.TASK_VERSION]: version } : {}),
391+
},
392+
},
393+
parentContext,
394+
);
395+
396+
if (failureMessage) {
397+
span.setStatus({ code: ctx.otel.SpanStatusCode.ERROR, message: failureMessage });
398+
}
399+
400+
span.end();
401+
}
402+
403+
/**
404+
* Emits a retroactive Client-kind span for a completed/failed sub-orchestration.
405+
* Matches .NET SDK's EmitTraceActivityForSubOrchestrationCompleted/Failed pattern.
406+
*
407+
* @param orchestrationSpan - The parent orchestration span.
408+
* @param subOrchName - The sub-orchestration name.
409+
* @param version - The sub-orchestration version (optional).
410+
* @param instanceId - The parent orchestration instance ID.
411+
* @param startTime - The scheduling timestamp from the SubOrchestrationInstanceCreated event.
412+
* @param failureMessage - If the sub-orchestration failed, the error message.
413+
*/
414+
export function emitRetroactiveSubOrchClientSpan(
415+
orchestrationSpan: Span,
416+
subOrchName: string,
417+
version: string | undefined,
418+
instanceId: string,
419+
startTime?: Date,
420+
failureMessage?: string,
421+
): void {
422+
const ctx = getTracingContext();
423+
if (!ctx) return;
424+
425+
const spanName = createSpanName(TaskType.ORCHESTRATION, subOrchName, version);
426+
const parentContext = ctx.otel.trace.setSpan(ctx.otel.context.active(), orchestrationSpan);
427+
428+
const span = ctx.tracer.startSpan(
429+
spanName,
430+
{
431+
kind: ctx.otel.SpanKind.CLIENT,
432+
startTime: startTime,
433+
attributes: {
434+
[DurableTaskAttributes.TYPE]: TaskType.ORCHESTRATION,
435+
[DurableTaskAttributes.TASK_NAME]: subOrchName,
436+
[DurableTaskAttributes.TASK_INSTANCE_ID]: instanceId,
437+
...(version ? { [DurableTaskAttributes.TASK_VERSION]: version } : {}),
438+
},
439+
},
440+
parentContext,
441+
);
442+
443+
if (failureMessage) {
444+
span.setStatus({ code: ctx.otel.SpanStatusCode.ERROR, message: failureMessage });
445+
}
446+
447+
span.end();
448+
}
449+
450+
/**
451+
* Processes new history events to emit retroactive spans for completed/failed tasks,
452+
* sub-orchestrations, and fired timers. This follows the .NET SDK pattern where the
453+
* worker emits these spans before the orchestrator executor runs.
454+
*
455+
* @param orchestrationSpan - The orchestration span (parent for retroactive spans).
456+
* @param pastEvents - The past (replay) history events to look up scheduling events.
457+
* @param newEvents - The new history events to process for completions/failures.
458+
* @param instanceId - The orchestration instance ID.
459+
* @param orchestrationName - The orchestration name (for timer spans).
460+
*/
461+
export function processNewEventsForTracing(
462+
orchestrationSpan: Span | undefined | null,
463+
pastEvents: pb.HistoryEvent[],
464+
newEvents: pb.HistoryEvent[],
465+
instanceId: string,
466+
orchestrationName: string,
467+
): void {
468+
if (!orchestrationSpan) return;
469+
if (!getTracingContext()) return;
470+
471+
// Build lookup maps from past events
472+
const taskScheduledEvents = new Map<number, pb.HistoryEvent>();
473+
const subOrchCreatedEvents = new Map<number, pb.HistoryEvent>();
474+
const timerCreatedEvents = new Map<number, pb.HistoryEvent>();
475+
476+
for (const event of pastEvents) {
477+
const eventId = event.getEventid();
478+
if (event.hasTaskscheduled()) {
479+
taskScheduledEvents.set(eventId, event);
480+
} else if (event.hasSuborchestrationinstancecreated()) {
481+
subOrchCreatedEvents.set(eventId, event);
482+
} else if (event.hasTimercreated()) {
483+
timerCreatedEvents.set(eventId, event);
484+
}
485+
}
486+
487+
// Process new events for completions, failures, and timer firings
488+
for (const newEvent of newEvents) {
489+
if (newEvent.hasTaskcompleted()) {
490+
const taskCompleted = newEvent.getTaskcompleted()!;
491+
const scheduledEvent = taskScheduledEvents.get(taskCompleted.getTaskscheduledid());
492+
if (scheduledEvent) {
493+
const taskScheduled = scheduledEvent.getTaskscheduled()!;
494+
emitRetroactiveActivityClientSpan(
495+
orchestrationSpan,
496+
taskScheduled.getName(),
497+
taskScheduled.getVersion()?.getValue(),
498+
instanceId,
499+
scheduledEvent.getEventid(),
500+
scheduledEvent.getTimestamp()?.toDate(),
501+
);
502+
}
503+
} else if (newEvent.hasTaskfailed()) {
504+
const taskFailed = newEvent.getTaskfailed()!;
505+
const scheduledEvent = taskScheduledEvents.get(taskFailed.getTaskscheduledid());
506+
if (scheduledEvent) {
507+
const taskScheduled = scheduledEvent.getTaskscheduled()!;
508+
const failureMessage =
509+
taskFailed.getFailuredetails()?.getErrormessage() ?? "Unspecified task activity failure";
510+
emitRetroactiveActivityClientSpan(
511+
orchestrationSpan,
512+
taskScheduled.getName(),
513+
taskScheduled.getVersion()?.getValue(),
514+
instanceId,
515+
scheduledEvent.getEventid(),
516+
scheduledEvent.getTimestamp()?.toDate(),
517+
failureMessage,
518+
);
519+
}
520+
} else if (newEvent.hasSuborchestrationinstancecompleted()) {
521+
const subOrchCompleted = newEvent.getSuborchestrationinstancecompleted()!;
522+
const createdEvent = subOrchCreatedEvents.get(subOrchCompleted.getTaskscheduledid());
523+
if (createdEvent) {
524+
const subOrchCreated = createdEvent.getSuborchestrationinstancecreated()!;
525+
emitRetroactiveSubOrchClientSpan(
526+
orchestrationSpan,
527+
subOrchCreated.getName(),
528+
subOrchCreated.getVersion()?.getValue(),
529+
instanceId,
530+
createdEvent.getTimestamp()?.toDate(),
531+
);
532+
}
533+
} else if (newEvent.hasSuborchestrationinstancefailed()) {
534+
const subOrchFailed = newEvent.getSuborchestrationinstancefailed()!;
535+
const createdEvent = subOrchCreatedEvents.get(subOrchFailed.getTaskscheduledid());
536+
if (createdEvent) {
537+
const subOrchCreated = createdEvent.getSuborchestrationinstancecreated()!;
538+
const failureMessage =
539+
subOrchFailed.getFailuredetails()?.getErrormessage() ?? "Unspecified sub-orchestration failure";
540+
emitRetroactiveSubOrchClientSpan(
541+
orchestrationSpan,
542+
subOrchCreated.getName(),
543+
subOrchCreated.getVersion()?.getValue(),
544+
instanceId,
545+
createdEvent.getTimestamp()?.toDate(),
546+
failureMessage,
547+
);
548+
}
549+
} else if (newEvent.hasTimerfired()) {
550+
const timerFired = newEvent.getTimerfired()!;
551+
const timerId = timerFired.getTimerid();
552+
const createdEvent = timerCreatedEvents.get(timerId);
553+
const fireAt = timerFired.getFireat()?.toDate() ?? new Date();
554+
emitSpanForTimer(
555+
orchestrationSpan,
556+
orchestrationName,
557+
fireAt,
558+
timerId,
559+
instanceId,
560+
createdEvent?.getTimestamp()?.toDate(),
561+
);
562+
}
563+
}
564+
}
565+
349566
/**
350567
* Emits a span for sending an event to another orchestration.
351568
*

packages/durabletask-js/src/worker/task-hub-grpc-worker.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import {
3030
processActionsForTracing,
3131
createOrchestrationTraceContextPb,
3232
setOrchestrationStatusFromActions,
33+
processNewEventsForTracing,
3334
setSpanError,
3435
setSpanOk,
3536
endSpan,
@@ -654,6 +655,20 @@ export class TaskHubGrpcWorker {
654655
? startSpanForOrchestrationExecution(executionStartedProtoEvent, orchTraceContext, instanceId)
655656
: undefined;
656657

658+
// Emit retroactive spans for tasks/sub-orchestrations that completed/failed and timers
659+
// that fired. This follows the .NET SDK pattern where these spans are emitted from
660+
// history events BEFORE the orchestrator executor runs.
661+
if (tracingResult) {
662+
const orchName = executionStartedProtoEvent?.getName() ?? "";
663+
processNewEventsForTracing(
664+
tracingResult.span,
665+
req.getPasteventsList(),
666+
req.getNeweventsList(),
667+
instanceId,
668+
orchName,
669+
);
670+
}
671+
657672
let res;
658673

659674
try {

0 commit comments

Comments
 (0)