From 20f8fb267dddc8db1e05b71090f19ccfd1ac3890 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Tue, 26 May 2026 10:35:41 -0700 Subject: [PATCH 01/12] Add Temporal Operation Handler --- .../src/__tests__/test-nexus-token-helpers.ts | 37 +- packages/nexus/src/index.ts | 4 + packages/nexus/src/token.ts | 73 +++- packages/nexus/src/workflow-helpers.ts | 186 +++++++- .../test/src/test-nexus-temporal-operation.ts | 410 ++++++++++++++++++ 5 files changed, 672 insertions(+), 38 deletions(-) create mode 100644 packages/test/src/test-nexus-temporal-operation.ts diff --git a/packages/nexus/src/__tests__/test-nexus-token-helpers.ts b/packages/nexus/src/__tests__/test-nexus-token-helpers.ts index 6d159e3ec3..367949c879 100644 --- a/packages/nexus/src/__tests__/test-nexus-token-helpers.ts +++ b/packages/nexus/src/__tests__/test-nexus-token-helpers.ts @@ -1,5 +1,10 @@ import test from 'ava'; -import { base64URLEncodeNoPadding, generateWorkflowRunOperationToken, loadWorkflowRunOperationToken } from '../token'; +import { + base64URLEncodeNoPadding, + generateWorkflowRunOperationToken, + loadOperationToken, + loadWorkflowRunOperationToken, +} from '../token'; test('encode and decode workflow run Operation token', (t) => { const expected = { @@ -12,28 +17,30 @@ test('encode and decode workflow run Operation token', (t) => { t.deepEqual(decoded, expected); }); -test('decode workflow run Operation token errors', (t) => { - t.throws(() => loadWorkflowRunOperationToken(''), { message: /invalid workflow run token: token is empty/ }); +test('decode Operation token errors', (t) => { + t.throws(() => loadOperationToken(''), { message: /invalid operation token: token is empty/ }); - t.throws(() => loadWorkflowRunOperationToken('not-base64!@#$'), { message: /failed to decode token/ }); + t.throws(() => loadOperationToken('not-base64!@#$'), { message: /failed to decode token/ }); const invalidJSONToken = base64URLEncodeNoPadding('invalid json'); - t.throws(() => loadWorkflowRunOperationToken(invalidJSONToken), { - message: /failed to unmarshal workflow run Operation token/, + t.throws(() => loadOperationToken(invalidJSONToken), { + message: /failed to unmarshal Operation token/, }); - const invalidTypeToken = base64URLEncodeNoPadding('{"t":2}'); - t.throws(() => loadWorkflowRunOperationToken(invalidTypeToken), { - message: /invalid workflow token type: 2, expected: 1/, + const versionedToken = base64URLEncodeNoPadding('{"v":1, "t":1,"wid": "workflow-id"}'); + t.throws(() => loadOperationToken(versionedToken), { + message: /invalid operation token: "v" field should not be present/, }); +}); - const missingWIDToken = base64URLEncodeNoPadding('{"t":1}'); - t.throws(() => loadWorkflowRunOperationToken(missingWIDToken), { - message: /invalid workflow run token: missing workflow ID \(wid\)/, +test('decode workflow run Operation token errors', (t) => { + const invalidTypeToken = base64URLEncodeNoPadding('{"t":2,"ns":"ns"}'); + t.throws(() => loadOperationToken(invalidTypeToken), { + message: /invalid operation token: unknown token type: 2/, }); - const versionedToken = base64URLEncodeNoPadding('{"v":1, "t":1,"wid": "workflow-id"}'); - t.throws(() => loadWorkflowRunOperationToken(versionedToken), { - message: /invalid workflow run token: "v" field should not be present/, + const missingWIDToken = base64URLEncodeNoPadding('{"t":1,"ns":"ns"}'); + t.throws(() => loadWorkflowRunOperationToken(missingWIDToken), { + message: /invalid workflow run token: missing workflow ID \(wid\)/, }); }); diff --git a/packages/nexus/src/index.ts b/packages/nexus/src/index.ts index f469ea4774..e9ca7f3d52 100644 --- a/packages/nexus/src/index.ts +++ b/packages/nexus/src/index.ts @@ -15,6 +15,10 @@ export { export { startWorkflow, + TemporalOperationHandler, + TemporalOperationResult, + type TemporalNexusClient, + type TemporalOperationStartHandler, WorkflowHandle, WorkflowRunOperationHandler, WorkflowRunOperationStartHandler, diff --git a/packages/nexus/src/token.ts b/packages/nexus/src/token.ts index 9db3372d02..762c573287 100644 --- a/packages/nexus/src/token.ts +++ b/packages/nexus/src/token.ts @@ -5,35 +5,45 @@ * @internal * @hidden */ -export interface WorkflowRunOperationToken { +export interface OperationToken { /** - * Version of the token, by default we assume we're on version 1, this field is not emitted as part of the output, + * Version of the token, by default we assume we're on version 0, this field is not emitted as part of the output, * it's only used to reject newer token versions on load. */ v?: number; /** - * Type of the Operation. Must be OPERATION_TOKEN_TYPE_WORKFLOW_RUN. + * Type of the Operation. */ t: OperationTokenType; /** - * Namespace of the workflow. + * Namespace of the operation. */ ns: string; /** * ID of the workflow. */ + wid?: string; +} + +/** + * @internal + * @hidden + */ +export interface WorkflowRunOperationToken extends OperationToken { + t: typeof OperationTokenType.WORKFLOW_RUN; wid: string; } -type OperationTokenType = (typeof OperationTokenType)[keyof typeof OperationTokenType]; + +export type OperationTokenType = (typeof OperationTokenType)[keyof typeof OperationTokenType]; /** * @internal * @hidden */ -const OperationTokenType = { +export const OperationTokenType = { WORKFLOW_RUN: 1, } as const; @@ -50,11 +60,11 @@ export function generateWorkflowRunOperationToken(namespace: string, workflowId: } /** - * Load and validate a workflow run Operation token. + * Load and validate the common fields of an Operation token. */ -export function loadWorkflowRunOperationToken(data: string): WorkflowRunOperationToken { +export function loadOperationToken(data: string): OperationToken { if (!data) { - throw new TypeError('invalid workflow run token: token is empty'); + throw new TypeError('invalid operation token: token is empty'); } let decoded: string; @@ -64,26 +74,57 @@ export function loadWorkflowRunOperationToken(data: string): WorkflowRunOperatio throw new TypeError('failed to decode token', { cause: err }); } - let token: WorkflowRunOperationToken; + let token: OperationToken; try { token = JSON.parse(decoded); } catch (err) { - throw new TypeError('failed to unmarshal workflow run Operation token', { cause: err }); + throw new TypeError('failed to unmarshal Operation token', { cause: err }); } - if (token.t !== OperationTokenType.WORKFLOW_RUN) { - throw new TypeError(`invalid workflow token type: ${token.t}, expected: ${OperationTokenType.WORKFLOW_RUN}`); + if (typeof token !== 'object' || token == null) { + throw new TypeError(`invalid operation token: expected object, got ${typeof token}`); } if (token.v !== undefined && token.v !== 0) { - throw new TypeError('invalid workflow run token: "v" field should not be present'); + throw new TypeError('invalid operation token: "v" field should not be present'); } - if (!token.wid) { - throw new TypeError('invalid workflow run token: missing workflow ID (wid)'); + if (typeof token.t !== 'number') { + throw new TypeError(`invalid operation token: expected token type to be a number, got ${typeof token.t}`); } + if (!isOperationTokenType(token.t)) { + throw new TypeError(`invalid operation token: unknown token type: ${token.t}`); + } + if (typeof token.ns !== 'string') { + throw new TypeError(`invalid operation token: expected namespace to be a string, got ${typeof token.ns}`); + } + + return token; +} +/** + * Load and validate a workflow run Operation token. + */ +export function loadWorkflowRunOperationToken(data: string): WorkflowRunOperationToken { + const token = loadOperationToken(data); + assertWorkflowRunOperationToken(token); return token; } +/** + * Assert that an OperationToken identifies a workflow run. + */ +export function assertWorkflowRunOperationToken(token: OperationToken): asserts token is WorkflowRunOperationToken { + if (token.t !== OperationTokenType.WORKFLOW_RUN) { + throw new TypeError(`invalid workflow token type: ${token.t}, expected: ${OperationTokenType.WORKFLOW_RUN}`); + } + if (!token.wid || typeof token.wid !== 'string') { + throw new TypeError('invalid workflow run token: missing workflow ID (wid)'); + } +} + +function isOperationTokenType(value: number): value is OperationTokenType { + return Object.values(OperationTokenType).includes(value as OperationTokenType); +} + // Exported for use in tests. export function base64URLEncodeNoPadding(str: string): string { const base64 = Buffer.from(str).toString('base64url'); diff --git a/packages/nexus/src/workflow-helpers.ts b/packages/nexus/src/workflow-helpers.ts index 2771fc61b8..dd395def7d 100644 --- a/packages/nexus/src/workflow-helpers.ts +++ b/packages/nexus/src/workflow-helpers.ts @@ -1,17 +1,18 @@ import * as nexus from 'nexus-rpc'; import type { Workflow, WorkflowResultType } from '@temporalio/common'; import type { Replace } from '@temporalio/common/lib/type-helpers'; -import type { WorkflowStartOptions as ClientWorkflowStartOptions } from '@temporalio/client'; +import type { Client, WorkflowStartOptions as ClientWorkflowStartOptions } from '@temporalio/client'; import { type temporal } from '@temporalio/proto'; import type { InternalWorkflowStartOptions } from '@temporalio/client/lib/internal'; import { InternalWorkflowStartOptionsSymbol } from '@temporalio/client/lib/internal'; -import { generateWorkflowRunOperationToken, loadWorkflowRunOperationToken } from './token'; +import { convertNexusLinkToTemporalLink, convertTemporalLinkToNexusLink } from './link-converter'; import { - convertNexusLinkToTemporalLink, - convertNexusLinkToWorkflowEventLink, - convertTemporalLinkToNexusLink, - convertWorkflowEventLinkToNexusLink, -} from './link-converter'; + assertWorkflowRunOperationToken, + generateWorkflowRunOperationToken, + loadOperationToken, + loadWorkflowRunOperationToken, + OperationTokenType, +} from './token'; import { getClient, getHandlerContext, log } from './context'; declare const isNexusWorkflowHandle: unique symbol; @@ -159,3 +160,174 @@ export class WorkflowRunOperationHandler implements nexus.OperationHandler await getClient().workflow.getHandle(decoded.wid).cancel(); } } + +/** + * Module-private brand and conversion key for {@link TemporalOperationResult}. Doubles as the + * type-level brand (so external code can't satisfy the interface) and the runtime accessor + * (since the symbol is in scope inside this module). + */ +const toHandlerResult: unique symbol = Symbol.for('__temporal_nexus_TemporalOperationResult_toHandlerResult__'); + +/** + * A result produced by a {@link TemporalOperationHandler}. Construct via + * {@link TemporalOperationResult.sync} or {@link TemporalOperationResult.async}. + + * @experimental Nexus support in Temporal SDK is experimental. + */ +export interface TemporalOperationResult { + readonly [toHandlerResult]: () => nexus.HandlerStartOperationResult; +} + +export const TemporalOperationResult = { + sync(value: T): TemporalOperationResult { + return { + [toHandlerResult](): nexus.HandlerStartOperationResult { + return nexus.HandlerStartOperationResult.sync(value); + }, + }; + }, + + async(token: string): TemporalOperationResult { + return { + [toHandlerResult](): nexus.HandlerStartOperationResult { + return nexus.HandlerStartOperationResult.async(token); + }, + }; + }, +}; + +/** + * A Nexus-aware Temporal Client for use inside {@link TemporalOperationHandler} implementations. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +export interface TemporalNexusClient { + /** + * The Temporal Client for the active Nexus Operation. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ + readonly client: Client; + + /** + * Starts a workflow run as the asynchronous backing operation for the current Nexus Operation. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ + startWorkflow( + workflowTypeOrFunc: string | T, + workflowOptions: WorkflowStartOptions + ): Promise>>; +} + +class TemporalNexusClientImpl implements TemporalNexusClient { + private asyncOperationStarted = false; + + constructor(private readonly startOperationContext: nexus.StartOperationContext) {} + + /** + * The Temporal Client for the active Nexus Operation. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ + public get client(): Client { + return getClient(); + } + + /** + * Starts a workflow run as the asynchronous backing operation for the current Nexus Operation. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ + public async startWorkflow( + workflowTypeOrFunc: string | T, + workflowOptions: WorkflowStartOptions + ): Promise>> { + return await this.withAsyncOperationStartReservation(async () => { + const handle = await startWorkflow(this.startOperationContext, workflowTypeOrFunc, workflowOptions); + const { namespace } = getHandlerContext(); + return TemporalOperationResult.async(generateWorkflowRunOperationToken(namespace, handle.workflowId)); + }); + } + + private async withAsyncOperationStartReservation(fn: () => Promise): Promise { + if (this.asyncOperationStarted) { + throw new nexus.HandlerError( + 'BAD_REQUEST', + 'Only one async operation can be started per operation handler invocation. Use TemporalNexusClient.client for additional workflow interactions' + ); + } + + this.asyncOperationStarted = true; + try { + return await fn(); + } catch (err) { + this.asyncOperationStarted = false; + throw err; + } + } +} + +/** + * A handler function for the {@link TemporalOperationHandler} constructor. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +export type TemporalOperationStartHandler = ( + ctx: nexus.StartOperationContext, + client: TemporalNexusClient, + input: I +) => Promise>; + +/** + * A Nexus Operation implementation for operations that interact with Temporal. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +export class TemporalOperationHandler implements nexus.OperationHandler { + constructor(readonly handler: TemporalOperationStartHandler) {} + + async start(ctx: nexus.StartOperationContext, input: I): Promise> { + const result = await this.handler(ctx, new TemporalNexusClientImpl(ctx), input); + return result[toHandlerResult](); + } + + async cancel(_ctx: nexus.CancelOperationContext, token: string): Promise { + const { namespace } = getHandlerContext(); + let opToken; + try { + opToken = loadOperationToken(token); + } catch (err) { + throw new nexus.HandlerError(nexus.HandlerErrorType.BAD_REQUEST, 'invalid operation token', { cause: err }); + } + + if (opToken.ns !== namespace) { + throw new nexus.HandlerError( + nexus.HandlerErrorType.BAD_REQUEST, + `Client namespace ${namespace} does not match operation token namespace ${opToken.ns}` + ); + } + switch (opToken.t) { + case OperationTokenType.WORKFLOW_RUN: + assertWorkflowRunOperationToken(opToken); + await this.cancelWorkflowRun(_ctx, opToken.wid); + return; + default: + throw new nexus.HandlerError( + nexus.HandlerErrorType.BAD_REQUEST, + `Unsupported operation token type: ${opToken.t}` + ); + } + } + + /** + * Cancel the workflow backing this Nexus Operation. + * + * Override this method to customize workflow cancellation behavior. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ + protected async cancelWorkflowRun(_ctx: nexus.CancelOperationContext, workflowId: string): Promise { + await getClient().workflow.getHandle(workflowId).cancel(); + } +} diff --git a/packages/test/src/test-nexus-temporal-operation.ts b/packages/test/src/test-nexus-temporal-operation.ts new file mode 100644 index 0000000000..7af8aab201 --- /dev/null +++ b/packages/test/src/test-nexus-temporal-operation.ts @@ -0,0 +1,410 @@ +import assert from 'assert'; +import { randomUUID } from 'crypto'; +import * as nexus from 'nexus-rpc'; +import { NexusOperationFailure } from '@temporalio/common'; +import { + NexusOperationExecutionStatus, + WorkflowExecutionAlreadyStartedError, + WorkflowFailedError, +} from '@temporalio/client'; +import * as temporalnexus from '@temporalio/nexus'; +import { asyncLocalStorage } from '@temporalio/nexus/lib/context'; +import { base64URLEncodeNoPadding } from '@temporalio/nexus/lib/token'; +import * as workflow from '@temporalio/workflow'; +import { helpers, makeTestFunction } from './helpers-integration'; +import { innermostHandlerError } from './helpers-nexus'; +import { waitUntil } from './helpers'; + +const test = makeTestFunction({ + workflowsPath: __filename, + workflowEnvironmentOpts: { + server: { + extraArgs: [ + '--dynamic-config-value', + 'nexusoperation.enableStandalone=true', + '--dynamic-config-value', + 'system.refreshNexusEndpointsMinWait="0s"', + '--dynamic-config-value', + 'history.enableChasmCallbacks=true', + ], + }, + }, +}); + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Service definitions + +const temporalOpService = nexus.service('temporalOperationService', { + asyncOp: nexus.operation(), + syncOp: nexus.operation(), + doubleStartOp: nexus.operation(), + retryAfterFailedStartOp: nexus.operation(), +}); + +const temporalCancelOpService = nexus.service('temporalCancelOperationService', { + blockingOp: nexus.operation(), +}); + +type TemporalOpServiceHandlers = nexus.ServiceHandlerFor; +type TemporalCancelOpServiceHandlers = nexus.ServiceHandlerFor; + +function unusedTemporalOperationHandler(): nexus.OperationHandler { + return new temporalnexus.TemporalOperationHandler(async () => { + throw new nexus.HandlerError('NOT_IMPLEMENTED', 'not used by this test'); + }); +} + +function makeTemporalOpServiceHandler(overrides: Partial) { + const handlers: TemporalOpServiceHandlers = { + asyncOp: unusedTemporalOperationHandler(), + syncOp: unusedTemporalOperationHandler(), + doubleStartOp: unusedTemporalOperationHandler(), + retryAfterFailedStartOp: unusedTemporalOperationHandler(), + ...overrides, + }; + return nexus.serviceHandler(temporalOpService, handlers); +} + +function makeTemporalCancelOpServiceHandler(handlers: TemporalCancelOpServiceHandlers) { + return nexus.serviceHandler(temporalCancelOpService, handlers); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Caller workflows + +export async function temporalAsyncOpCaller(endpoint: string): Promise { + const client = workflow.createNexusServiceClient({ endpoint, service: temporalOpService }); + return await client.executeOperation('asyncOp', 'hello'); +} + +export async function temporalSyncOpCaller(endpoint: string): Promise { + const client = workflow.createNexusServiceClient({ endpoint, service: temporalOpService }); + return await client.executeOperation('syncOp', 'hello'); +} + +export async function temporalDoubleStartOpCaller(endpoint: string): Promise { + const client = workflow.createNexusServiceClient({ endpoint, service: temporalOpService }); + return await client.executeOperation('doubleStartOp', 'hello'); +} + +export async function temporalRetryAfterFailedStartOpCaller(endpoint: string, workflowId: string): Promise { + const client = workflow.createNexusServiceClient({ endpoint, service: temporalOpService }); + return await client.executeOperation('retryAfterFailedStartOp', workflowId); +} + +export async function temporalDefaultCancelWorkflowCaller(endpoint: string, targetWorkflowId: string): Promise { + const client = workflow.createNexusServiceClient({ endpoint, service: temporalCancelOpService }); + try { + await client.executeOperation('blockingOp', targetWorkflowId, { + cancellationType: 'WAIT_CANCELLATION_COMPLETED', + }); + } catch (err) { + if (workflow.isCancellation(err)) return; + throw err; + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Target workflows + +export async function echoWorkflow(input: string): Promise { + return input; +} + +export async function blockingTargetWorkflow(): Promise { + await workflow.condition(() => false); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Tests + +test('TemporalOperationHandler infers correct output type from typed workflow function', async (t) => { + const _stringOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler( + async (_ctx, client, input: string) => { + return await client.startWorkflow(echoWorkflow, { + args: [input], + workflowId: 'test', + }); + } + ); + + // @ts-expect-error - Output type should be string, not number + const _mismatchedOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler( + async (_ctx, client, input: string) => { + return await client.startWorkflow(echoWorkflow, { + args: [input], + workflowId: 'test', + }); + } + ); + + const _syncOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler( + async (_ctx, _client, input: string) => { + return temporalnexus.TemporalOperationResult.sync(input); + } + ); + + const _explicitStringOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler< + string, + string + >(async (_ctx, client, input) => { + return await client.startWorkflow(echoWorkflow, { + args: [input], + workflowId: 'test', + }); + }); + + // This test only checks for compile-time errors. + t.pass(); +}); + +test('TemporalOperationHandler cancel delegates to overridable cancelWorkflow', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const workflowId = randomUUID(); + + let customCancelCalled = false; + + class CustomCancelHandler extends temporalnexus.TemporalOperationHandler { + async cancelWorkflowRun(_ctx: nexus.CancelOperationContext, workflowId: string): Promise { + const handle = temporalnexus.getClient().workflow.getHandle(workflowId); + await handle.cancel(); + customCancelCalled = true; + } + } + + const worker = await createWorker({ + nexusServices: [ + makeTemporalCancelOpServiceHandler({ + blockingOp: new CustomCancelHandler(async (_ctx, client, workflowId) => { + return await client.startWorkflow(blockingTargetWorkflow, { + workflowId, + }); + }), + }), + ], + }); + + await worker.runUntil(async () => { + const serviceClient = t.context.env.client.nexus.createServiceClient({ + endpoint: endpointName, + service: temporalCancelOpService, + }); + const operation = await serviceClient.startOperation(temporalCancelOpService.operations.blockingOp, workflowId, { + id: 'op-' + randomUUID(), + scheduleToCloseTimeout: '60s', + }); + const workflowHandle = t.context.env.client.workflow.getHandle(workflowId); + + await waitUntil(async () => { + try { + return (await workflowHandle.describe()).status.name === 'RUNNING'; + } catch { + return false; + } + }, 4000); + + await operation.cancel('test cancellation'); + + await waitUntil(async () => (await operation.describe()).status === NexusOperationExecutionStatus.CANCELED, 4000); + await waitUntil(async () => (await workflowHandle.describe()).status.name === 'CANCELLED', 4000); + + t.is((await operation.describe()).status, NexusOperationExecutionStatus.CANCELED); + t.is((await workflowHandle.describe()).status.name, 'CANCELLED'); + t.true(customCancelCalled); + }); +}); + +test('TemporalOperationHandler cancel rejects invalid operation token type', async (t) => { + class CustomCancelHandler extends temporalnexus.TemporalOperationHandler { + async cancelWorkflowRun(_ctx: nexus.CancelOperationContext, _workflowId: string): Promise { + throw new Error('cancelWorkflow should not be called'); + } + } + + const handler = new CustomCancelHandler(async () => temporalnexus.TemporalOperationResult.sync(undefined)); + const token = base64URLEncodeNoPadding(JSON.stringify({ t: 2, ns: 'test-namespace' })); + + const err = await asyncLocalStorage.run( + { + client: undefined as any, + endpoint: 'test-endpoint', + namespace: 'test-namespace', + taskQueue: 'test-task-queue', + log: undefined as any, + metrics: undefined as any, + }, + async () => { + return await t.throwsAsync( + handler.cancel( + { + abortSignal: new AbortController().signal, + headers: {}, + operation: 'operation', + service: 'service', + }, + token + ) + ); + } + ); + + t.regex(err?.message ?? '', /invalid operation token/); +}); + +test('TemporalOperationHandler async and sync happy paths - caller workflow', async (t) => { + const { createWorker, executeWorkflow, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + + const worker = await createWorker({ + nexusServices: [ + makeTemporalOpServiceHandler({ + asyncOp: new temporalnexus.TemporalOperationHandler(async (_ctx, client, input) => { + return await client.startWorkflow(echoWorkflow, { + workflowId: randomUUID(), + args: [input], + }); + }), + syncOp: new temporalnexus.TemporalOperationHandler(async (_ctx, _client, input) => { + return temporalnexus.TemporalOperationResult.sync(input); + }), + }), + ], + }); + + await worker.runUntil(async () => { + let result = await executeWorkflow(temporalAsyncOpCaller, { + args: [endpointName], + }); + t.is(result, 'hello'); + + result = await executeWorkflow(temporalSyncOpCaller, { + args: [endpointName], + }); + t.is(result, 'hello'); + }); +}); + +test('TemporalOperationHandler rejects multiple async starts', async (t) => { + const { createWorker, executeWorkflow, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + + const worker = await createWorker({ + nexusServices: [ + makeTemporalOpServiceHandler({ + doubleStartOp: new temporalnexus.TemporalOperationHandler(async (_ctx, client, input) => { + await client.startWorkflow(echoWorkflow, { + workflowId: randomUUID(), + args: [input], + }); + await client.startWorkflow(echoWorkflow, { + workflowId: randomUUID(), + args: [input], + }); + throw new nexus.HandlerError('INTERNAL', 'expected previous error to be thrown'); + }), + }), + ], + }); + + await worker.runUntil(async () => { + const err = await t.throwsAsync( + () => + executeWorkflow(temporalDoubleStartOpCaller, { + args: [endpointName], + }), + { + instanceOf: WorkflowFailedError, + } + ); + assert(err?.cause instanceof NexusOperationFailure); + assert(err.cause.cause instanceof nexus.HandlerError); + const inner = innermostHandlerError(err.cause.cause); + t.is(inner.type, 'BAD_REQUEST'); + t.regex(inner.message, /Only one async operation can be started per operation handler invocation/); + }); +}); + +test('TemporalOperationHandler allows retry after failed async start', async (t) => { + const { createWorker, executeWorkflow, startWorkflow, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const conflictWorkflowId = randomUUID(); + + const worker = await createWorker({ + nexusServices: [ + makeTemporalOpServiceHandler({ + retryAfterFailedStartOp: new temporalnexus.TemporalOperationHandler( + async (_ctx, client, workflowId) => { + try { + await client.startWorkflow(blockingTargetWorkflow, { + workflowId, + workflowIdConflictPolicy: 'FAIL', + }); + } catch (err) { + if (!(err instanceof WorkflowExecutionAlreadyStartedError)) { + throw err; + } + return await client.startWorkflow(echoWorkflow, { + workflowId: randomUUID(), + args: [workflowId], + }); + } + throw new nexus.HandlerError('INTERNAL', 'Expected first workflow start to fail'); + } + ), + }), + ], + }); + + await worker.runUntil(async () => { + const conflictHandle = await startWorkflow(blockingTargetWorkflow, { + workflowId: conflictWorkflowId, + }); + try { + const result = await executeWorkflow(temporalRetryAfterFailedStartOpCaller, { + args: [endpointName, conflictWorkflowId], + }); + t.is(result, conflictWorkflowId); + } finally { + await conflictHandle.cancel(); + } + }); +}); + +test('TemporalOperationHandler default cancelWorkflow cancels backing workflow', async (t) => { + const { createWorker, startWorkflow, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const targetWorkflowId = randomUUID(); + + const worker = await createWorker({ + nexusServices: [ + makeTemporalCancelOpServiceHandler({ + blockingOp: new temporalnexus.TemporalOperationHandler(async (_ctx, client, workflowId) => { + return await client.startWorkflow(blockingTargetWorkflow, { + workflowId, + }); + }), + }), + ], + }); + + await worker.runUntil(async () => { + const callerHandle = await startWorkflow(temporalDefaultCancelWorkflowCaller, { + args: [endpointName, targetWorkflowId], + }); + + await waitUntil( + async () => !!(await callerHandle.fetchHistory()).events?.some((ev) => ev.nexusOperationStartedEventAttributes), + 4000 + ); + + const targetHandle = t.context.env.client.workflow.getHandle(targetWorkflowId); + t.is((await targetHandle.describe()).status.name, 'RUNNING'); + + await callerHandle.cancel(); + await callerHandle.result(); + + await waitUntil(async () => (await targetHandle.describe()).status.name === 'CANCELLED', 4000); + t.is((await targetHandle.describe()).status.name, 'CANCELLED'); + }); +}); From 51f5ae4fbf548a40614f3450768e6625dc2b14c8 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Wed, 27 May 2026 10:31:32 -0700 Subject: [PATCH 02/12] Update loadWorkflowRunOperationToken tests --- packages/nexus/src/__tests__/test-nexus-token-helpers.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/nexus/src/__tests__/test-nexus-token-helpers.ts b/packages/nexus/src/__tests__/test-nexus-token-helpers.ts index 367949c879..1662806812 100644 --- a/packages/nexus/src/__tests__/test-nexus-token-helpers.ts +++ b/packages/nexus/src/__tests__/test-nexus-token-helpers.ts @@ -35,7 +35,10 @@ test('decode Operation token errors', (t) => { test('decode workflow run Operation token errors', (t) => { const invalidTypeToken = base64URLEncodeNoPadding('{"t":2,"ns":"ns"}'); - t.throws(() => loadOperationToken(invalidTypeToken), { + t.throws(() => loadWorkflowRunOperationToken(invalidTypeToken), { + // This currently fails on unknown token type as there are no other existing token types. + // When new token types are added this regex will need to be updated to + // /invalid workflow token type: 2/ message: /invalid operation token: unknown token type: 2/, }); From 8eb0efcf61ec09e9259d15ab9ca9132cd7fb0a06 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Wed, 27 May 2026 10:58:10 -0700 Subject: [PATCH 03/12] Minor tweaks from self review --- packages/nexus/src/token.ts | 4 ---- packages/nexus/src/workflow-helpers.ts | 8 +++---- .../test/src/test-nexus-temporal-operation.ts | 24 +++++++------------ 3 files changed, 13 insertions(+), 23 deletions(-) diff --git a/packages/nexus/src/token.ts b/packages/nexus/src/token.ts index 762c573287..a3d706a63f 100644 --- a/packages/nexus/src/token.ts +++ b/packages/nexus/src/token.ts @@ -28,10 +28,6 @@ export interface OperationToken { wid?: string; } -/** - * @internal - * @hidden - */ export interface WorkflowRunOperationToken extends OperationToken { t: typeof OperationTokenType.WORKFLOW_RUN; wid: string; diff --git a/packages/nexus/src/workflow-helpers.ts b/packages/nexus/src/workflow-helpers.ts index dd395def7d..0d1be9d7f2 100644 --- a/packages/nexus/src/workflow-helpers.ts +++ b/packages/nexus/src/workflow-helpers.ts @@ -187,9 +187,9 @@ export const TemporalOperationResult = { }; }, - async(token: string): TemporalOperationResult { + async(token: string): TemporalOperationResult { return { - [toHandlerResult](): nexus.HandlerStartOperationResult { + [toHandlerResult](): nexus.HandlerStartOperationResult { return nexus.HandlerStartOperationResult.async(token); }, }; @@ -292,7 +292,7 @@ export class TemporalOperationHandler implements nexus.OperationHandler { + async cancel(ctx: nexus.CancelOperationContext, token: string): Promise { const { namespace } = getHandlerContext(); let opToken; try { @@ -310,7 +310,7 @@ export class TemporalOperationHandler implements nexus.OperationHandler { const client = workflow.createNexusServiceClient({ endpoint, service: temporalCancelOpService }); - try { - await client.executeOperation('blockingOp', targetWorkflowId, { - cancellationType: 'WAIT_CANCELLATION_COMPLETED', - }); - } catch (err) { - if (workflow.isCancellation(err)) return; - throw err; - } + await client.executeOperation('blockingOp', targetWorkflowId, { + cancellationType: 'WAIT_CANCELLATION_COMPLETED', + }); } //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -209,8 +204,6 @@ test('TemporalOperationHandler cancel delegates to overridable cancelWorkflow', await waitUntil(async () => (await operation.describe()).status === NexusOperationExecutionStatus.CANCELED, 4000); await waitUntil(async () => (await workflowHandle.describe()).status.name === 'CANCELLED', 4000); - t.is((await operation.describe()).status, NexusOperationExecutionStatus.CANCELED); - t.is((await workflowHandle.describe()).status.name, 'CANCELLED'); t.true(customCancelCalled); }); }); @@ -218,7 +211,7 @@ test('TemporalOperationHandler cancel delegates to overridable cancelWorkflow', test('TemporalOperationHandler cancel rejects invalid operation token type', async (t) => { class CustomCancelHandler extends temporalnexus.TemporalOperationHandler { async cancelWorkflowRun(_ctx: nexus.CancelOperationContext, _workflowId: string): Promise { - throw new Error('cancelWorkflow should not be called'); + throw new Error('cancelWorkflowRun should not be called'); } } @@ -349,7 +342,9 @@ test('TemporalOperationHandler allows retry after failed async start', async (t) args: [workflowId], }); } - throw new nexus.HandlerError('INTERNAL', 'Expected first workflow start to fail'); + throw new nexus.HandlerError('INTERNAL', 'Expected first workflow start to fail', { + retryableOverride: false, + }); } ), }), @@ -371,7 +366,7 @@ test('TemporalOperationHandler allows retry after failed async start', async (t) }); }); -test('TemporalOperationHandler default cancelWorkflow cancels backing workflow', async (t) => { +test('TemporalOperationHandler default cancelWorkflowRun cancels backing workflow', async (t) => { const { createWorker, startWorkflow, registerNexusEndpoint } = helpers(t); const { endpointName } = await registerNexusEndpoint(); const targetWorkflowId = randomUUID(); @@ -402,9 +397,8 @@ test('TemporalOperationHandler default cancelWorkflow cancels backing workflow', t.is((await targetHandle.describe()).status.name, 'RUNNING'); await callerHandle.cancel(); - await callerHandle.result(); + await waitUntil(async () => (await callerHandle.describe()).status.name === 'CANCELLED', 4000); await waitUntil(async () => (await targetHandle.describe()).status.name === 'CANCELLED', 4000); - t.is((await targetHandle.describe()).status.name, 'CANCELLED'); }); }); From d91e71a7a3b938d08f99b569811cf52d501d8eb6 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Wed, 27 May 2026 15:22:46 -0700 Subject: [PATCH 04/12] Address some claude & codex suggestions. Swap to taking options rather than requiring subclassing --- packages/nexus/src/index.ts | 1 + packages/nexus/src/token.ts | 16 +- packages/nexus/src/workflow-helpers.ts | 42 ++++-- .../test/src/test-nexus-temporal-operation.ts | 141 ++++++++++-------- 4 files changed, 120 insertions(+), 80 deletions(-) diff --git a/packages/nexus/src/index.ts b/packages/nexus/src/index.ts index e9ca7f3d52..cab3db9c77 100644 --- a/packages/nexus/src/index.ts +++ b/packages/nexus/src/index.ts @@ -15,6 +15,7 @@ export { export { startWorkflow, + type TemporalOperationHandlerOptions, TemporalOperationHandler, TemporalOperationResult, type TemporalNexusClient, diff --git a/packages/nexus/src/token.ts b/packages/nexus/src/token.ts index a3d706a63f..486fe36cb9 100644 --- a/packages/nexus/src/token.ts +++ b/packages/nexus/src/token.ts @@ -1,6 +1,5 @@ /** - * OperationTokenType is used to identify the type of Operation token. - * Currently, we only have one type of Operation token: WorkflowRun. + * Serializable token identifying a Nexus operation target. * * @internal * @hidden @@ -28,11 +27,24 @@ export interface OperationToken { wid?: string; } +/** + * An OperationToken that identifies a WorkflowRun operation. + * + * @internal + * @hidden + */ export interface WorkflowRunOperationToken extends OperationToken { t: typeof OperationTokenType.WORKFLOW_RUN; wid: string; } +/** + * OperationTokenType is used to identify the type of Operation token. + * Currently, we only have one type of Operation token: WorkflowRun. + * + * @internal + * @hidden + */ export type OperationTokenType = (typeof OperationTokenType)[keyof typeof OperationTokenType]; /** diff --git a/packages/nexus/src/workflow-helpers.ts b/packages/nexus/src/workflow-helpers.ts index 0d1be9d7f2..3d6e7455e1 100644 --- a/packages/nexus/src/workflow-helpers.ts +++ b/packages/nexus/src/workflow-helpers.ts @@ -279,16 +279,31 @@ export type TemporalOperationStartHandler = ( input: I ) => Promise>; +/** + * Options for customizing a {@link TemporalOperationHandler}. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +export interface TemporalOperationHandlerOptions { + cancelWorkflowRun?: (ctx: nexus.CancelOperationContext, client: Client, workflowId: string) => Promise; +} + /** * A Nexus Operation implementation for operations that interact with Temporal. * * @experimental Nexus support in Temporal SDK is experimental. */ export class TemporalOperationHandler implements nexus.OperationHandler { - constructor(readonly handler: TemporalOperationStartHandler) {} + private readonly startHandler: TemporalOperationStartHandler; + private readonly cancelWorkflowRunHandler: NonNullable; + + constructor(options: { start: TemporalOperationStartHandler } & TemporalOperationHandlerOptions) { + this.startHandler = options.start; + this.cancelWorkflowRunHandler = options.cancelWorkflowRun ?? defaultCancelWorkflowRun; + } async start(ctx: nexus.StartOperationContext, input: I): Promise> { - const result = await this.handler(ctx, new TemporalNexusClientImpl(ctx), input); + const result = await this.startHandler(ctx, new TemporalNexusClientImpl(ctx), input); return result[toHandlerResult](); } @@ -309,8 +324,14 @@ export class TemporalOperationHandler implements nexus.OperationHandler implements nexus.OperationHandler { - await getClient().workflow.getHandle(workflowId).cancel(); - } +async function defaultCancelWorkflowRun(_ctx: nexus.CancelOperationContext, client: Client, workflowId: string) { + await client.workflow.getHandle(workflowId).cancel(); } diff --git a/packages/test/src/test-nexus-temporal-operation.ts b/packages/test/src/test-nexus-temporal-operation.ts index d9fcf1cfca..98dd45d045 100644 --- a/packages/test/src/test-nexus-temporal-operation.ts +++ b/packages/test/src/test-nexus-temporal-operation.ts @@ -49,8 +49,10 @@ type TemporalOpServiceHandlers = nexus.ServiceHandlerFor; function unusedTemporalOperationHandler(): nexus.OperationHandler { - return new temporalnexus.TemporalOperationHandler(async () => { - throw new nexus.HandlerError('NOT_IMPLEMENTED', 'not used by this test'); + return new temporalnexus.TemporalOperationHandler({ + async start() { + throw new nexus.HandlerError('NOT_IMPLEMENTED', 'not used by this test'); + }, }); } @@ -114,67 +116,68 @@ export async function blockingTargetWorkflow(): Promise { // Tests test('TemporalOperationHandler infers correct output type from typed workflow function', async (t) => { - const _stringOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler( - async (_ctx, client, input: string) => { + const _stringOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input: string) { return await client.startWorkflow(echoWorkflow, { args: [input], workflowId: 'test', }); - } - ); + }, + }); // @ts-expect-error - Output type should be string, not number - const _mismatchedOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler( - async (_ctx, client, input: string) => { + const _mismatchedOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input: string) { return await client.startWorkflow(echoWorkflow, { args: [input], workflowId: 'test', }); - } - ); + }, + }); - const _syncOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler( - async (_ctx, _client, input: string) => { + const _syncOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler({ + async start(_ctx, _client, input: string) { return temporalnexus.TemporalOperationResult.sync(input); - } - ); + }, + }); const _explicitStringOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler< string, string - >(async (_ctx, client, input) => { - return await client.startWorkflow(echoWorkflow, { - args: [input], - workflowId: 'test', - }); + >({ + async start(_ctx, client, input) { + return await client.startWorkflow(echoWorkflow, { + args: [input], + workflowId: 'test', + }); + }, }); // This test only checks for compile-time errors. t.pass(); }); -test('TemporalOperationHandler cancel delegates to overridable cancelWorkflow', async (t) => { +test('TemporalOperationHandler cancel delegates to provided cancelWorkflowRun handler', async (t) => { const { createWorker, registerNexusEndpoint } = helpers(t); const { endpointName } = await registerNexusEndpoint(); const workflowId = randomUUID(); let customCancelCalled = false; - class CustomCancelHandler extends temporalnexus.TemporalOperationHandler { - async cancelWorkflowRun(_ctx: nexus.CancelOperationContext, workflowId: string): Promise { - const handle = temporalnexus.getClient().workflow.getHandle(workflowId); - await handle.cancel(); - customCancelCalled = true; - } - } - const worker = await createWorker({ nexusServices: [ makeTemporalCancelOpServiceHandler({ - blockingOp: new CustomCancelHandler(async (_ctx, client, workflowId) => { - return await client.startWorkflow(blockingTargetWorkflow, { - workflowId, - }); + blockingOp: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, workflowId) { + return await client.startWorkflow(blockingTargetWorkflow, { + workflowId, + }); + }, + async cancelWorkflowRun(_ctx, client, workflowId) { + const handle = client.workflow.getHandle(workflowId); + await handle.cancel(); + customCancelCalled = true; + }, }), }), ], @@ -208,14 +211,16 @@ test('TemporalOperationHandler cancel delegates to overridable cancelWorkflow', }); }); -test('TemporalOperationHandler cancel rejects invalid operation token type', async (t) => { - class CustomCancelHandler extends temporalnexus.TemporalOperationHandler { - async cancelWorkflowRun(_ctx: nexus.CancelOperationContext, _workflowId: string): Promise { - throw new Error('cancelWorkflowRun should not be called'); - } - } +test('TemporalOperationHandler.cancel rejects invalid operation token type before invoking cancellation hooks', async (t) => { + const handler = new temporalnexus.TemporalOperationHandler({ + async start(_ctx, _client, _input) { + return temporalnexus.TemporalOperationResult.sync(undefined); + }, - const handler = new CustomCancelHandler(async () => temporalnexus.TemporalOperationResult.sync(undefined)); + async cancelWorkflowRun(_ctx, _client, _workflowId) { + throw new Error('cancelWorkflowRun should not be called'); + }, + }); const token = base64URLEncodeNoPadding(JSON.stringify({ t: 2, ns: 'test-namespace' })); const err = await asyncLocalStorage.run( @@ -252,14 +257,18 @@ test('TemporalOperationHandler async and sync happy paths - caller workflow', as const worker = await createWorker({ nexusServices: [ makeTemporalOpServiceHandler({ - asyncOp: new temporalnexus.TemporalOperationHandler(async (_ctx, client, input) => { - return await client.startWorkflow(echoWorkflow, { - workflowId: randomUUID(), - args: [input], - }); + asyncOp: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + return await client.startWorkflow(echoWorkflow, { + workflowId: randomUUID(), + args: [input], + }); + }, }), - syncOp: new temporalnexus.TemporalOperationHandler(async (_ctx, _client, input) => { - return temporalnexus.TemporalOperationResult.sync(input); + syncOp: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, _client, input) { + return temporalnexus.TemporalOperationResult.sync(input); + }, }), }), ], @@ -285,16 +294,18 @@ test('TemporalOperationHandler rejects multiple async starts', async (t) => { const worker = await createWorker({ nexusServices: [ makeTemporalOpServiceHandler({ - doubleStartOp: new temporalnexus.TemporalOperationHandler(async (_ctx, client, input) => { - await client.startWorkflow(echoWorkflow, { - workflowId: randomUUID(), - args: [input], - }); - await client.startWorkflow(echoWorkflow, { - workflowId: randomUUID(), - args: [input], - }); - throw new nexus.HandlerError('INTERNAL', 'expected previous error to be thrown'); + doubleStartOp: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + await client.startWorkflow(echoWorkflow, { + workflowId: randomUUID(), + args: [input], + }); + await client.startWorkflow(echoWorkflow, { + workflowId: randomUUID(), + args: [input], + }); + throw new nexus.HandlerError('INTERNAL', 'expected previous error to be thrown'); + }, }), }), ], @@ -326,8 +337,8 @@ test('TemporalOperationHandler allows retry after failed async start', async (t) const worker = await createWorker({ nexusServices: [ makeTemporalOpServiceHandler({ - retryAfterFailedStartOp: new temporalnexus.TemporalOperationHandler( - async (_ctx, client, workflowId) => { + retryAfterFailedStartOp: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, workflowId) { try { await client.startWorkflow(blockingTargetWorkflow, { workflowId, @@ -345,8 +356,8 @@ test('TemporalOperationHandler allows retry after failed async start', async (t) throw new nexus.HandlerError('INTERNAL', 'Expected first workflow start to fail', { retryableOverride: false, }); - } - ), + }, + }), }), ], }); @@ -374,10 +385,12 @@ test('TemporalOperationHandler default cancelWorkflowRun cancels backing workflo const worker = await createWorker({ nexusServices: [ makeTemporalCancelOpServiceHandler({ - blockingOp: new temporalnexus.TemporalOperationHandler(async (_ctx, client, workflowId) => { - return await client.startWorkflow(blockingTargetWorkflow, { - workflowId, - }); + blockingOp: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, workflowId) { + return await client.startWorkflow(blockingTargetWorkflow, { + workflowId, + }); + }, }), }), ], From 978b0abd9f67c88bbb9fe393fad2ed64f2a7493c Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 28 May 2026 15:16:50 -0700 Subject: [PATCH 05/12] introduce temporal native context interfaces to use rather than the nexusrpc interfaces. Add options type to cancelWorkflowRun --- packages/nexus/src/context.ts | 19 ++++++++++++ packages/nexus/src/index.ts | 3 ++ packages/nexus/src/workflow-helpers.ts | 31 +++++++++++++++---- .../test/src/test-nexus-temporal-operation.ts | 6 ++-- 4 files changed, 50 insertions(+), 9 deletions(-) diff --git a/packages/nexus/src/context.ts b/packages/nexus/src/context.ts index fee7823f18..b5dfbdb92f 100644 --- a/packages/nexus/src/context.ts +++ b/packages/nexus/src/context.ts @@ -1,4 +1,5 @@ import { AsyncLocalStorage } from 'node:async_hooks'; +import type * as nexus from 'nexus-rpc'; import type { Logger, LogLevel, LogMetadata, MetricMeter } from '@temporalio/common'; import type { Client } from '@temporalio/client'; @@ -57,6 +58,24 @@ export interface OperationInfo { readonly endpoint: string; } +/** + * Context received by a {@link TemporalOperationHandler}'s start handler when a Nexus Operation is + * started. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +// eslint-disable-next-line @typescript-eslint/no-empty-object-type +export interface TemporalNexusStartOperationContext extends nexus.StartOperationContext {} + +/** + * Context received by a {@link TemporalOperationHandler}'s cancel handler when a Nexus Operation is + * canceled. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +// eslint-disable-next-line @typescript-eslint/no-empty-object-type +export interface TemporalNexusCancelOperationContext extends nexus.CancelOperationContext {} + // Basic APIs ////////////////////////////////////////////////////////////////////////////////////// /** diff --git a/packages/nexus/src/index.ts b/packages/nexus/src/index.ts index cab3db9c77..5555605262 100644 --- a/packages/nexus/src/index.ts +++ b/packages/nexus/src/index.ts @@ -11,10 +11,13 @@ export { metricMeter, operationInfo, type OperationInfo, + type TemporalNexusCancelOperationContext, + type TemporalNexusStartOperationContext, } from './context'; export { startWorkflow, + type CancelWorkflowRunOptions, type TemporalOperationHandlerOptions, TemporalOperationHandler, TemporalOperationResult, diff --git a/packages/nexus/src/workflow-helpers.ts b/packages/nexus/src/workflow-helpers.ts index 3d6e7455e1..eb84c245eb 100644 --- a/packages/nexus/src/workflow-helpers.ts +++ b/packages/nexus/src/workflow-helpers.ts @@ -13,7 +13,13 @@ import { loadWorkflowRunOperationToken, OperationTokenType, } from './token'; -import { getClient, getHandlerContext, log } from './context'; +import { + getClient, + getHandlerContext, + log, + type TemporalNexusCancelOperationContext, + type TemporalNexusStartOperationContext, +} from './context'; declare const isNexusWorkflowHandle: unique symbol; declare const workflowResultType: unique symbol; @@ -274,18 +280,31 @@ class TemporalNexusClientImpl implements TemporalNexusClient { * @experimental Nexus support in Temporal SDK is experimental. */ export type TemporalOperationStartHandler = ( - ctx: nexus.StartOperationContext, + ctx: TemporalNexusStartOperationContext, client: TemporalNexusClient, input: I ) => Promise>; +/** + * Options passed to a {@link TemporalOperationHandlerOptions.cancelWorkflowRun} handler describing + * the workflow run to cancel. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +export interface CancelWorkflowRunOptions { + /** + * The ID of the workflow backing the Nexus Operation that is being canceled. + */ + readonly workflowId: string; +} + /** * Options for customizing a {@link TemporalOperationHandler}. * * @experimental Nexus support in Temporal SDK is experimental. */ export interface TemporalOperationHandlerOptions { - cancelWorkflowRun?: (ctx: nexus.CancelOperationContext, client: Client, workflowId: string) => Promise; + cancelWorkflowRun?: (ctx: TemporalNexusCancelOperationContext, options: CancelWorkflowRunOptions) => Promise; } /** @@ -331,7 +350,7 @@ export class TemporalOperationHandler implements nexus.OperationHandler implements nexus.OperationHandler Date: Thu, 28 May 2026 16:10:39 -0700 Subject: [PATCH 06/12] address points raised by new opus 4.8 --- packages/nexus/src/workflow-helpers.ts | 27 +++++++------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/packages/nexus/src/workflow-helpers.ts b/packages/nexus/src/workflow-helpers.ts index eb84c245eb..cfacfdef77 100644 --- a/packages/nexus/src/workflow-helpers.ts +++ b/packages/nexus/src/workflow-helpers.ts @@ -168,11 +168,9 @@ export class WorkflowRunOperationHandler implements nexus.OperationHandler } /** - * Module-private brand and conversion key for {@link TemporalOperationResult}. Doubles as the - * type-level brand (so external code can't satisfy the interface) and the runtime accessor - * (since the symbol is in scope inside this module). + * Module-private brand and payload key for {@link TemporalOperationResult}. */ -const toHandlerResult: unique symbol = Symbol.for('__temporal_nexus_TemporalOperationResult_toHandlerResult__'); +const operationResult: unique symbol = Symbol('temporal_nexus_TemporalOperationResult'); /** * A result produced by a {@link TemporalOperationHandler}. Construct via @@ -181,23 +179,19 @@ const toHandlerResult: unique symbol = Symbol.for('__temporal_nexus_TemporalOper * @experimental Nexus support in Temporal SDK is experimental. */ export interface TemporalOperationResult { - readonly [toHandlerResult]: () => nexus.HandlerStartOperationResult; + readonly [operationResult]: nexus.HandlerStartOperationResult; } export const TemporalOperationResult = { sync(value: T): TemporalOperationResult { return { - [toHandlerResult](): nexus.HandlerStartOperationResult { - return nexus.HandlerStartOperationResult.sync(value); - }, + [operationResult]: nexus.HandlerStartOperationResult.sync(value), }; }, async(token: string): TemporalOperationResult { return { - [toHandlerResult](): nexus.HandlerStartOperationResult { - return nexus.HandlerStartOperationResult.async(token); - }, + [operationResult]: nexus.HandlerStartOperationResult.async(token), }; }, }; @@ -229,7 +223,7 @@ export interface TemporalNexusClient { class TemporalNexusClientImpl implements TemporalNexusClient { private asyncOperationStarted = false; - constructor(private readonly startOperationContext: nexus.StartOperationContext) {} + constructor(private readonly startOperationContext: TemporalNexusStartOperationContext) {} /** * The Temporal Client for the active Nexus Operation. @@ -323,11 +317,10 @@ export class TemporalOperationHandler implements nexus.OperationHandler> { const result = await this.startHandler(ctx, new TemporalNexusClientImpl(ctx), input); - return result[toHandlerResult](); + return result[operationResult]; } async cancel(ctx: nexus.CancelOperationContext, token: string): Promise { - const { namespace } = getHandlerContext(); let opToken; try { opToken = loadOperationToken(token); @@ -335,12 +328,6 @@ export class TemporalOperationHandler implements nexus.OperationHandler Date: Mon, 1 Jun 2026 16:37:56 -0700 Subject: [PATCH 07/12] Swap from TemporalNexus prefix to Temporal --- packages/nexus/src/context.ts | 4 ++-- packages/nexus/src/index.ts | 4 ++-- packages/nexus/src/workflow-helpers.ts | 12 ++++++------ 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/nexus/src/context.ts b/packages/nexus/src/context.ts index b5dfbdb92f..35dc73b9a9 100644 --- a/packages/nexus/src/context.ts +++ b/packages/nexus/src/context.ts @@ -65,7 +65,7 @@ export interface OperationInfo { * @experimental Nexus support in Temporal SDK is experimental. */ // eslint-disable-next-line @typescript-eslint/no-empty-object-type -export interface TemporalNexusStartOperationContext extends nexus.StartOperationContext {} +export interface TemporalStartOperationContext extends nexus.StartOperationContext {} /** * Context received by a {@link TemporalOperationHandler}'s cancel handler when a Nexus Operation is @@ -74,7 +74,7 @@ export interface TemporalNexusStartOperationContext extends nexus.StartOperation * @experimental Nexus support in Temporal SDK is experimental. */ // eslint-disable-next-line @typescript-eslint/no-empty-object-type -export interface TemporalNexusCancelOperationContext extends nexus.CancelOperationContext {} +export interface TemporalCancelOperationContext extends nexus.CancelOperationContext {} // Basic APIs ////////////////////////////////////////////////////////////////////////////////////// diff --git a/packages/nexus/src/index.ts b/packages/nexus/src/index.ts index 5555605262..96f720bae4 100644 --- a/packages/nexus/src/index.ts +++ b/packages/nexus/src/index.ts @@ -11,8 +11,8 @@ export { metricMeter, operationInfo, type OperationInfo, - type TemporalNexusCancelOperationContext, - type TemporalNexusStartOperationContext, + type TemporalCancelOperationContext, + type TemporalStartOperationContext, } from './context'; export { diff --git a/packages/nexus/src/workflow-helpers.ts b/packages/nexus/src/workflow-helpers.ts index cfacfdef77..caf9ae18ae 100644 --- a/packages/nexus/src/workflow-helpers.ts +++ b/packages/nexus/src/workflow-helpers.ts @@ -17,8 +17,8 @@ import { getClient, getHandlerContext, log, - type TemporalNexusCancelOperationContext, - type TemporalNexusStartOperationContext, + type TemporalCancelOperationContext, + type TemporalStartOperationContext, } from './context'; declare const isNexusWorkflowHandle: unique symbol; @@ -223,7 +223,7 @@ export interface TemporalNexusClient { class TemporalNexusClientImpl implements TemporalNexusClient { private asyncOperationStarted = false; - constructor(private readonly startOperationContext: TemporalNexusStartOperationContext) {} + constructor(private readonly startOperationContext: TemporalStartOperationContext) {} /** * The Temporal Client for the active Nexus Operation. @@ -274,7 +274,7 @@ class TemporalNexusClientImpl implements TemporalNexusClient { * @experimental Nexus support in Temporal SDK is experimental. */ export type TemporalOperationStartHandler = ( - ctx: TemporalNexusStartOperationContext, + ctx: TemporalStartOperationContext, client: TemporalNexusClient, input: I ) => Promise>; @@ -298,7 +298,7 @@ export interface CancelWorkflowRunOptions { * @experimental Nexus support in Temporal SDK is experimental. */ export interface TemporalOperationHandlerOptions { - cancelWorkflowRun?: (ctx: TemporalNexusCancelOperationContext, options: CancelWorkflowRunOptions) => Promise; + cancelWorkflowRun?: (ctx: TemporalCancelOperationContext, options: CancelWorkflowRunOptions) => Promise; } /** @@ -348,6 +348,6 @@ export class TemporalOperationHandler implements nexus.OperationHandler Date: Mon, 1 Jun 2026 15:54:59 -0700 Subject: [PATCH 08/12] initial draft of SAA temporal operation handler integration --- packages/client/src/activity-client.ts | 172 +---------- packages/client/src/activity-options.ts | 149 ++++++++++ packages/client/src/index.ts | 1 + packages/client/src/interceptors.ts | 3 +- packages/client/src/internal.ts | 22 +- packages/client/src/workflow-client.ts | 6 +- .../src/__tests__/test-nexus-token-helpers.ts | 5 +- packages/nexus/src/index.ts | 4 + packages/nexus/src/token.ts | 54 +++- packages/nexus/src/workflow-helpers.ts | 271 ++++++++++++++---- .../test/src/test-nexus-temporal-operation.ts | 72 ++++- 11 files changed, 531 insertions(+), 228 deletions(-) create mode 100644 packages/client/src/activity-options.ts diff --git a/packages/client/src/activity-client.ts b/packages/client/src/activity-client.ts index 7bc83b26ea..0846dbe83e 100644 --- a/packages/client/src/activity-client.ts +++ b/packages/client/src/activity-client.ts @@ -1,14 +1,6 @@ import { randomUUID } from 'node:crypto'; import { status as grpcStatus } from '@grpc/grpc-js'; -import type { - ActivityFunction, - LoadedDataConverter, - Next, - Priority, - RetryPolicy, - SearchAttributePair, - TypedSearchAttributes, -} from '@temporalio/common'; +import type { LoadedDataConverter, Next } from '@temporalio/common'; import { compilePriority, compileRetryPolicy, @@ -16,7 +8,6 @@ import { decodePriority, decompileRetryPolicy, } from '@temporalio/common'; -import type { Duration } from '@temporalio/common/lib/time'; import { msOptionalToTs, msToNumber, optionalTsToDate, optionalTsToMs } from '@temporalio/common/lib/time'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { @@ -32,7 +23,7 @@ import { encodeUserMetadata, } from '@temporalio/common/lib/internal-non-workflow'; import { temporal } from '@temporalio/proto'; -import type { Replace } from '@temporalio/common/lib/type-helpers'; +import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; import type { ActivityCancelInput, ActivityClientInterceptor, @@ -45,13 +36,7 @@ import type { } from './interceptors'; import type { AsyncCompletionClientOptions } from './async-completion-client'; import { AsyncCompletionClient } from './async-completion-client'; -import type { - ActivityExecutionDescription, - ActivityExecutionInfo, - ActivityIdConflictPolicy, - ActivityIdReusePolicy, - CountActivityExecutions, -} from './types'; +import type { ActivityExecutionDescription, ActivityExecutionInfo, CountActivityExecutions } from './types'; import { decodeActivityExecutionStatus, decodePendingActivityState, @@ -67,6 +52,8 @@ import { ActivityExecutionFailedError, ActivityExecutionAlreadyStartedError, } from './errors'; +import { type InternalActivityStartOptions, InternalNexusStartOptionsSymbol } from './internal'; +import type { ActivityOptions, ActivityOptionsFor, ActivityResult, ActivityName } from './activity-options'; /** * Options used to configure {@link ActivityClient} @@ -256,10 +243,17 @@ export class ActivityClient extends AsyncCompletionClient implements TypedActivi } validateActivityOptions(input.options); + const internalOptions = (input.options as InternalActivityStartOptions)[InternalNexusStartOptionsSymbol]; + try { const resp = await this.workflowService.startActivityExecution( await this.buildStartActivityExecutionRequest(input) ); + + if (internalOptions != null) { + internalOptions.backLink = resp.link ?? undefined; + } + return this.createHandle(input.options.id, resp.runId); } catch (err) { if (isGrpcServiceError(err) && err.code === grpcStatus.ALREADY_EXISTS) { @@ -290,6 +284,8 @@ export class ActivityClient extends AsyncCompletionClient implements TypedActivi ? { indexedFields: encodeUnifiedSearchAttributes(undefined, input.options.typedSearchAttributes) } : undefined; + const internalOptions = (input.options as InternalActivityStartOptions)[InternalNexusStartOptionsSymbol]; + return { namespace: this.options.namespace, identity: this.options.identity, @@ -310,6 +306,7 @@ export class ActivityClient extends AsyncCompletionClient implements TypedActivi userMetadata: await encodeUserMetadata(this.dataConverter, input.options.summary, undefined), priority: input.options.priority ? compilePriority(input.options.priority) : undefined, startDelay: msOptionalToTs(input.options.startDelay), + ...filterNullAndUndefined(internalOptions ?? {}), }; } @@ -495,77 +492,6 @@ export interface ActivityHandle { terminate(reason: string): Promise; } -/** - * Options used by {@link ActivityClient.start}. - * - * @experimental Standalone Activities are experimental. APIs may be subject to change. - */ -export interface ActivityOptions { - /** - * Activity ID of the started activity. It's recommended to use a meaningful business ID. - */ - id: string; - /** - * Task queue to run this activity on. - */ - taskQueue: string; - /** - * Input arguments to pass to the activity. - */ - args?: any[] | Readonly; - /** - * If set, specifies maximum time between successful heartbeats. - */ - heartbeatTimeout?: Duration; - /** - * Controls how Activity is retried. If not set, the server will assign default retry policy. - */ - retry?: RetryPolicy; - /** - * Is set, specifies total time the activity is allowed to run, including retries. - * - * Note: it is required to set at least one of {@link startToCloseTimeout} and {@link scheduleToCloseTimeout}. - */ - startToCloseTimeout?: Duration; - /** - * If set, specifies maximum time the activity can wait in the task queue before being picked up by a worker. - * This timeout is non-retryable. - */ - scheduleToStartTimeout?: Duration; - /** - * If set, specifies maximum time for a single execution attempt. This timeout is retryable. - * - * Note: it is required to set at least one of {@link startToCloseTimeout} and {@link scheduleToCloseTimeout}. - */ - scheduleToCloseTimeout?: Duration; - /** - * A single-line fixed summary for this activity execution that may appear in UI/CLI. - * This can be in single-line Temporal markdown format. - */ - summary?: string; - /** - * Priority to use when starting this activity. - */ - priority?: Priority; - /** - * Time to wait before dispatching the first activity task. This delay is not applied to retry attempts. - */ - startDelay?: Duration; - /** - * Specifies behavior if there's a *closed* activity with the same ID. - */ - idReusePolicy?: ActivityIdReusePolicy; - /** - * Specifies behavior if there's a *running* activity with the same ID. Note that there can only be one running - * Activity for each Activity ID. - */ - idConflictPolicy?: ActivityIdConflictPolicy; - /** - * Search attributes for the activity. - */ - typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes; -} - function validateActivityOptions(options: ActivityOptions): void { if (!options.id) { throw new TypeError('id is required'); @@ -663,71 +589,3 @@ export interface TypedActivityClient { execute>(activity: N, options: ActivityOptionsFor): Promise>; } - -/** - * Utility type to support strong typing in {@link TypedActivityClient}. - * Contains names of activities extracted from the specified activity interface. - * @template T Activity interface - * - * @experimental Standalone Activities are experimental. APIs may be subject to change. - */ -export type ActivityName = { - [N in keyof T & string]: T[N] extends ActivityFunction ? N : never; -}[keyof T & string]; - -/** - * Utility type to support strong typing in {@link TypedActivityClient}. - * Extracts argument types of an activity. - * @template T Activity interface - * @template N Activity name - * - * @experimental Standalone Activities are experimental. APIs may be subject to change. - */ -export type ActivityArgs> = T[N] extends ActivityFunction ? P : never; - -/** - * Utility type to support strong typing in {@link TypedActivityClient}. - * Extracts result type of an activity. - * @template T Activity interface - * @template N Activity name - * - * @experimental Standalone Activities are experimental. APIs may be subject to change. - */ -export type ActivityResult> = T[N] extends ActivityFunction ? R : never; - -/** - * Utility type to support strong typing in {@link TypedActivityClient}. - * Represents {@link ActivityOptions} with strongly typed arguments. - * @template Args Types of activity arguments as an array type. - * - * @experimental Standalone Activities are experimental. APIs may be subject to change. - */ -export type ActivityOptionsWithArgs = Args extends [any, ...any] - ? Replace< - ActivityOptions, - { - /** - * Arguments to pass to the Activity - */ - args: Args | Readonly; - } - > - : Replace< - ActivityOptions, - { - /** - * Arguments to pass to the Activity - */ - args?: Args | Readonly; - } - >; - -/** - * Utility type to support strong typing in {@link TypedActivityClient}. - * Represents {@link ActivityOptions} with strongly typed arguments matching specified Activity in specified interface. - * @template T Activity interface - * @template N Activity name - * - * @experimental Standalone Activities are experimental. APIs may be subject to change. - */ -export type ActivityOptionsFor> = ActivityOptionsWithArgs>; diff --git a/packages/client/src/activity-options.ts b/packages/client/src/activity-options.ts new file mode 100644 index 0000000000..d1b5203ca7 --- /dev/null +++ b/packages/client/src/activity-options.ts @@ -0,0 +1,149 @@ +import type { + ActivityFunction, + Priority, + RetryPolicy, + SearchAttributePair, + TypedSearchAttributes, +} from '@temporalio/common'; +import type { Duration } from '@temporalio/common/lib/time'; +import type { Replace } from '@temporalio/common/lib/type-helpers'; +import type { ActivityIdConflictPolicy, ActivityIdReusePolicy } from './types'; + +/** + * Options used by {@link ActivityClient.start}. + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityOptions { + /** + * Activity ID of the started activity. It's recommended to use a meaningful business ID. + */ + id: string; + /** + * Task queue to run this activity on. + */ + taskQueue: string; + /** + * Input arguments to pass to the activity. + */ + args?: any[] | Readonly; + /** + * If set, specifies maximum time between successful heartbeats. + */ + heartbeatTimeout?: Duration; + /** + * Controls how Activity is retried. If not set, the server will assign default retry policy. + */ + retry?: RetryPolicy; + /** + * Is set, specifies total time the activity is allowed to run, including retries. + * + * Note: it is required to set at least one of {@link startToCloseTimeout} and {@link scheduleToCloseTimeout}. + */ + startToCloseTimeout?: Duration; + /** + * If set, specifies maximum time the activity can wait in the task queue before being picked up by a worker. + * This timeout is non-retryable. + */ + scheduleToStartTimeout?: Duration; + /** + * If set, specifies maximum time for a single execution attempt. This timeout is retryable. + * + * Note: it is required to set at least one of {@link startToCloseTimeout} and {@link scheduleToCloseTimeout}. + */ + scheduleToCloseTimeout?: Duration; + /** + * A single-line fixed summary for this activity execution that may appear in UI/CLI. + * This can be in single-line Temporal markdown format. + */ + summary?: string; + /** + * Priority to use when starting this activity. + */ + priority?: Priority; + /** + * Time to wait before dispatching the first activity task. This delay is not applied to retry attempts. + */ + startDelay?: Duration; + /** + * Specifies behavior if there's a *closed* activity with the same ID. + */ + idReusePolicy?: ActivityIdReusePolicy; + /** + * Specifies behavior if there's a *running* activity with the same ID. Note that there can only be one running + * Activity for each Activity ID. + */ + idConflictPolicy?: ActivityIdConflictPolicy; + /** + * Search attributes for the activity. + */ + typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes; +} + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Contains names of activities extracted from the specified activity interface. + * @template T Activity interface + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityName = { + [N in keyof T & string]: T[N] extends ActivityFunction ? N : never; +}[keyof T & string]; + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Extracts argument types of an activity. + * @template T Activity interface + * @template N Activity name + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityArgs> = T[N] extends ActivityFunction ? P : never; + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Extracts result type of an activity. + * @template T Activity interface + * @template N Activity name + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityResult> = T[N] extends ActivityFunction ? R : never; + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Represents {@link ActivityOptions} with strongly typed arguments. + * @template Args Types of activity arguments as an array type. + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityOptionsWithArgs = Args extends [any, ...any] + ? Replace< + ActivityOptions, + { + /** + * Arguments to pass to the Activity + */ + args: Args | Readonly; + } + > + : Replace< + ActivityOptions, + { + /** + * Arguments to pass to the Activity + */ + args?: Args | Readonly; + } + >; + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Represents {@link ActivityOptions} with strongly typed arguments matching specified Activity in specified interface. + * @template T Activity interface + * @template N Activity name + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityOptionsFor> = ActivityOptionsWithArgs>; diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index 40e0fef991..e8c9857abb 100644 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -55,6 +55,7 @@ export * from '@temporalio/common/lib/interfaces'; export * from '@temporalio/common/lib/workflow-handle'; export * from './async-completion-client'; export * from './activity-client'; +export * from './activity-options'; export * from './client'; export { Connection, diff --git a/packages/client/src/interceptors.ts b/packages/client/src/interceptors.ts index eae5e03667..7f4d2a5f42 100644 --- a/packages/client/src/interceptors.ts +++ b/packages/client/src/interceptors.ts @@ -26,7 +26,8 @@ import type { WorkflowExecution, } from './types'; import type { CompiledWorkflowOptions, WorkflowUpdateOptions } from './workflow-options'; -import type { ActivityHandle, ActivityOptions } from './activity-client'; +import type { ActivityHandle } from './activity-client'; +import type { ActivityOptions } from './activity-options'; export { Headers, Next }; diff --git a/packages/client/src/internal.ts b/packages/client/src/internal.ts index d981e6c042..bd46a8ba0a 100644 --- a/packages/client/src/internal.ts +++ b/packages/client/src/internal.ts @@ -1,36 +1,37 @@ import type { temporal } from '@temporalio/proto'; import type { WorkflowOptions } from './workflow-options'; +import type { ActivityOptions } from './activity-options'; /** - * A symbol used to attach extra, SDK-internal options to the `WorkflowClient.start()` call. + * A symbol used to attach extra, SDK-internal options to the `WorkflowClient.start()` + * or `ActivityClient.start()` call. * * These are notably used by the Temporal Nexus helpers. * * @internal * @hidden */ -export const InternalWorkflowStartOptionsSymbol = Symbol.for('__temporal_internal_client_workflow_start_options'); -export interface InternalWorkflowStartOptions extends WorkflowOptions { - [InternalWorkflowStartOptionsSymbol]?: { +export const InternalNexusStartOptionsSymbol = Symbol.for('__temporal_internal_client_nexus_start_options'); +export interface InternalNexusStartOptions { + [InternalNexusStartOptionsSymbol]?: { /** - * Request ID to be used for the workflow. + * Request ID to be used for the start call. */ requestId?: string; /** * Callbacks to be called by the server when this workflow reaches a terminal state. - * If the workflow continues-as-new, these callbacks will be carried over to the new execution. * Callback addresses must be whitelisted in the server's dynamic configuration. */ completionCallbacks?: temporal.api.common.v1.ICallback[]; /** - * Links to be associated with the workflow. + * Links to be associated with the workflow or activity execution. */ links?: temporal.api.common.v1.ILink[]; /** - * Backlink copied by the client from the StartWorkflowExecutionResponse. + * Backlink copied by the client from the start response. * Only populated in servers newer than 1.27. */ backLink?: temporal.api.common.v1.ILink; @@ -38,8 +39,11 @@ export interface InternalWorkflowStartOptions extends WorkflowOptions { /** * Conflict options for when USE_EXISTING is specified. * - * Used by the Nexus WorkflowRunOperations to attach to a callback to a running workflow. + * Used by the Nexus Operations to attach to a callback to a running execution (workflow or activity). */ onConflictOptions?: temporal.api.workflow.v1.IOnConflictOptions; }; } + +export type InternalWorkflowStartOptions = WorkflowOptions & InternalNexusStartOptions; +export type InternalActivityStartOptions = ActivityOptions & InternalNexusStartOptions; diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 6e93602d4e..74309df594 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -94,7 +94,7 @@ import { BaseClient, defaultBaseClientOptions } from './base-client'; import { mapAsyncIterable } from './iterators-utils'; import { WorkflowUpdateStage, encodeWorkflowUpdateStage } from './workflow-update-stage'; import type { InternalWorkflowStartOptions } from './internal'; -import { InternalWorkflowStartOptionsSymbol } from './internal'; +import { InternalNexusStartOptionsSymbol } from './internal'; import { adaptWorkflowClientInterceptor } from './interceptor-adapters'; const UpdateWorkflowExecutionLifecycleStage = temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage; @@ -1293,7 +1293,7 @@ export class WorkflowClient extends BaseClient { protected async _startWorkflowHandler(input: WorkflowStartInput): Promise { const req = await this.createStartWorkflowRequest(input); const { options: opts, workflowType } = input; - const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol]; + const internalOptions = (opts as InternalWorkflowStartOptions)[InternalNexusStartOptionsSymbol]; try { const response = await this.workflowService.startWorkflowExecution(req); if (internalOptions != null) { @@ -1320,7 +1320,7 @@ export class WorkflowClient extends BaseClient { const { identity, namespace } = this.options; const dataConverter = this.dataConverter; const context = this.workflowSerializationContext(opts.workflowId); - const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol]; + const internalOptions = (opts as InternalWorkflowStartOptions)[InternalNexusStartOptionsSymbol]; const supportsEagerStart = (this.connection as InternalConnectionLike)?.[InternalConnectionLikeSymbol] ?.supportsEagerStart; diff --git a/packages/nexus/src/__tests__/test-nexus-token-helpers.ts b/packages/nexus/src/__tests__/test-nexus-token-helpers.ts index 1662806812..ab25bd3b49 100644 --- a/packages/nexus/src/__tests__/test-nexus-token-helpers.ts +++ b/packages/nexus/src/__tests__/test-nexus-token-helpers.ts @@ -34,12 +34,11 @@ test('decode Operation token errors', (t) => { }); test('decode workflow run Operation token errors', (t) => { - const invalidTypeToken = base64URLEncodeNoPadding('{"t":2,"ns":"ns"}'); + const invalidTypeToken = base64URLEncodeNoPadding('{"t":99,"ns":"ns"}'); t.throws(() => loadWorkflowRunOperationToken(invalidTypeToken), { // This currently fails on unknown token type as there are no other existing token types. // When new token types are added this regex will need to be updated to - // /invalid workflow token type: 2/ - message: /invalid operation token: unknown token type: 2/, + message: /invalid operation token: unknown token type: 99/, }); const missingWIDToken = base64URLEncodeNoPadding('{"t":1,"ns":"ns"}'); diff --git a/packages/nexus/src/index.ts b/packages/nexus/src/index.ts index 96f720bae4..e182458c95 100644 --- a/packages/nexus/src/index.ts +++ b/packages/nexus/src/index.ts @@ -17,11 +17,15 @@ export { export { startWorkflow, + type ActivityOptions as ActivityStartOptions, + type ActivityOptionsFor as ActivityStartOptionsFor, + type CancelActivityOptions, type CancelWorkflowRunOptions, type TemporalOperationHandlerOptions, TemporalOperationHandler, TemporalOperationResult, type TemporalNexusClient, + type NexusTypedActivityClient, type TemporalOperationStartHandler, WorkflowHandle, WorkflowRunOperationHandler, diff --git a/packages/nexus/src/token.ts b/packages/nexus/src/token.ts index 486fe36cb9..bf2a30b891 100644 --- a/packages/nexus/src/token.ts +++ b/packages/nexus/src/token.ts @@ -22,9 +22,19 @@ export interface OperationToken { ns: string; /** - * ID of the workflow. + * ID of a workflow for OperationTokenType.WORKFLOW_RUN. */ wid?: string; + + /** + * ID of an activity for OperationTokenType.ACTIVITY. + */ + aid?: string; + + /** + * Run ID of an activity for OperationTokenType.ACTIVITY. + */ + rid?: string; } /** @@ -38,9 +48,20 @@ export interface WorkflowRunOperationToken extends OperationToken { wid: string; } +/** + * An OperationToken that identifies an Activity operation. + * + * @internal + * @hidden + */ +export interface ActivityOperationToken extends OperationToken { + t: typeof OperationTokenType.ACTIVITY; + aid: string; + rid: string; +} + /** * OperationTokenType is used to identify the type of Operation token. - * Currently, we only have one type of Operation token: WorkflowRun. * * @internal * @hidden @@ -53,6 +74,7 @@ export type OperationTokenType = (typeof OperationTokenType)[keyof typeof Operat */ export const OperationTokenType = { WORKFLOW_RUN: 1, + ACTIVITY: 2, } as const; /** @@ -67,6 +89,19 @@ export function generateWorkflowRunOperationToken(namespace: string, workflowId: return base64URLEncodeNoPadding(JSON.stringify(token)); } +/** + * Generate an activity Operation token. + */ +export function generateActivityOperationToken(namespace: string, activityId: string, runId: string): string { + const token: ActivityOperationToken = { + t: OperationTokenType.ACTIVITY, + ns: namespace, + aid: activityId, + rid: runId, + }; + return base64URLEncodeNoPadding(JSON.stringify(token)); +} + /** * Load and validate the common fields of an Operation token. */ @@ -129,6 +164,21 @@ export function assertWorkflowRunOperationToken(token: OperationToken): asserts } } +/** + * Assert that an OperationToken identifies an activity. + */ +export function assertActivityOperationToken(token: OperationToken): asserts token is ActivityOperationToken { + if (token.t !== OperationTokenType.ACTIVITY) { + throw new TypeError(`invalid activity token type: ${token.t}, expected: ${OperationTokenType.ACTIVITY}`); + } + if (!token.aid || typeof token.aid !== 'string') { + throw new TypeError('invalid activity token: missing activity ID (aid)'); + } + if (!token.rid || typeof token.rid !== 'string') { + throw new TypeError('invalid activity token: missing activity run ID (rid)'); + } +} + function isOperationTokenType(value: number): value is OperationTokenType { return Object.values(OperationTokenType).includes(value as OperationTokenType); } diff --git a/packages/nexus/src/workflow-helpers.ts b/packages/nexus/src/workflow-helpers.ts index caf9ae18ae..2773e10054 100644 --- a/packages/nexus/src/workflow-helpers.ts +++ b/packages/nexus/src/workflow-helpers.ts @@ -1,14 +1,24 @@ import * as nexus from 'nexus-rpc'; import type { Workflow, WorkflowResultType } from '@temporalio/common'; import type { Replace } from '@temporalio/common/lib/type-helpers'; -import type { Client, WorkflowStartOptions as ClientWorkflowStartOptions } from '@temporalio/client'; +import type { + ActivityHandle, + ActivityName, + ActivityOptions as ClientActivityOptions, + ActivityOptionsFor as ClientActivityOptionsFor, + ActivityResult, + Client, + WorkflowStartOptions as ClientWorkflowStartOptions, +} from '@temporalio/client'; import { type temporal } from '@temporalio/proto'; -import type { InternalWorkflowStartOptions } from '@temporalio/client/lib/internal'; -import { InternalWorkflowStartOptionsSymbol } from '@temporalio/client/lib/internal'; +import type { InternalActivityStartOptions, InternalNexusStartOptions } from '@temporalio/client/lib/internal'; +import { InternalNexusStartOptionsSymbol } from '@temporalio/client/lib/internal'; import { convertNexusLinkToTemporalLink, convertTemporalLinkToNexusLink } from './link-converter'; import { + assertActivityOperationToken, assertWorkflowRunOperationToken, generateWorkflowRunOperationToken, + generateActivityOperationToken, loadOperationToken, loadWorkflowRunOperationToken, OperationTokenType, @@ -24,6 +34,52 @@ import { declare const isNexusWorkflowHandle: unique symbol; declare const workflowResultType: unique symbol; +async function startWithNexusOptions( + ctx: nexus.StartOperationContext, + handler: (opts: InternalNexusStartOptions[typeof InternalNexusStartOptionsSymbol]) => Promise +) { + const links = Array(); + if (ctx.inboundLinks?.length > 0) { + for (const l of ctx.inboundLinks) { + try { + links.push(convertNexusLinkToTemporalLink(l)); + } catch (error) { + log.warn('failed to convert Nexus link to Workflow event link', { error }); + } + } + } + + const internalOptions: InternalNexusStartOptions[typeof InternalNexusStartOptionsSymbol] = { + links, + requestId: ctx.requestId, + onConflictOptions: { + attachLinks: true, + attachCompletionCallbacks: true, + attachRequestId: true, + }, + }; + + if (ctx.callbackUrl) { + internalOptions.completionCallbacks = [ + { + nexus: { url: ctx.callbackUrl, header: ctx.callbackHeaders }, + links, // pass in links here as well for older servers, newer servers dedupe them. + }, + ]; + } + + const result = await handler(internalOptions); + if (internalOptions.backLink != null) { + try { + ctx.outboundLinks.push(convertTemporalLinkToNexusLink(internalOptions.backLink)); + } catch (error) { + log.warn('failed to convert temporal link to Nexus link', { error }); + } + } + + return result; +} + /** * A handle to a running workflow that is returned by the {@link startWorkflow} helper. * This handle should be returned by {@link WorkflowRunOperationStartHandler} implementations. @@ -84,57 +140,23 @@ export async function startWorkflow( workflowTypeOrFunc: string | T, workflowOptions: WorkflowStartOptions ): Promise>> { - const { client, taskQueue } = getHandlerContext(); - const links = Array(); - if (ctx.inboundLinks?.length > 0) { - for (const l of ctx.inboundLinks) { - try { - links.push(convertNexusLinkToTemporalLink(l)); - } catch (error) { - log.warn('failed to convert Nexus link to Workflow event link', { error }); - } - } - } - const internalOptions: InternalWorkflowStartOptions[typeof InternalWorkflowStartOptionsSymbol] = { - links, - requestId: ctx.requestId, - }; - - internalOptions.onConflictOptions = { - attachLinks: true, - attachCompletionCallbacks: true, - attachRequestId: true, - }; - - if (ctx.callbackUrl) { - internalOptions.completionCallbacks = [ - { - nexus: { url: ctx.callbackUrl, header: ctx.callbackHeaders }, - links, // pass in links here as well for older servers, newer servers dedupe them. - }, - ]; - } - - const { taskQueue: userSpecifiedTaskQueue, ...rest } = workflowOptions; - const startOptions: ClientWorkflowStartOptions = { - ...rest, - taskQueue: userSpecifiedTaskQueue || taskQueue, - [InternalWorkflowStartOptionsSymbol]: internalOptions, - }; + return await startWithNexusOptions(ctx, async (internalOptions) => { + const { client, taskQueue } = getHandlerContext(); + + const { taskQueue: userSpecifiedTaskQueue, ...rest } = workflowOptions; + const startOptions: ClientWorkflowStartOptions = { + ...rest, + taskQueue: userSpecifiedTaskQueue || taskQueue, + [InternalNexusStartOptionsSymbol]: internalOptions, + }; - const handle = await client.workflow.start(workflowTypeOrFunc, startOptions); - if (internalOptions.backLink != null) { - try { - ctx.outboundLinks.push(convertTemporalLinkToNexusLink(internalOptions.backLink)); - } catch (error) { - log.warn('failed to convert temporal link to Nexus link', { error }); - } - } + const handle = await client.workflow.start(workflowTypeOrFunc, startOptions); - return { - workflowId: handle.workflowId, - runId: handle.firstExecutionRunId, - } as WorkflowHandle>; + return { + workflowId: handle.workflowId, + runId: handle.firstExecutionRunId, + } as WorkflowHandle>; + }); } /** @@ -167,6 +189,66 @@ export class WorkflowRunOperationHandler implements nexus.OperationHandler } } +/** + * Options for starting an untyped activity using {@link TemporalNexusClient.startActivity}, this type is identical to the + * client's `ActivityOptions` with the exception that `taskQueue` is optional and defaults + * to the current worker's task queue. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +export type ActivityOptions = Replace; + +/** + * Options for starting a typed activity using {@link NexusTypedActivityClient.startActivity}, this type is identical to the + * client's `ActivityOptionsFor` with the exception that `taskQueue` is optional and defaults + * to the current worker's task queue. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +export type ActivityOptionsFor> = Replace< + ClientActivityOptionsFor, + { taskQueue?: string } +>; + +/** + * Starts an activity for a {@link TemporalNexusClient.startActivity}, linking the execution chain + * to a Nexus Operation. Automatically propagates the callback, request ID, and back and forward + * links from the Nexus options to the Activity. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +async function startActivity( + ctx: TemporalStartOperationContext, + activity: string, + activityOptions: ActivityOptions +): Promise> { + return await startWithNexusOptions(ctx, async (internalOptions) => { + const { client, taskQueue } = getHandlerContext(); + + const { taskQueue: userSpecifiedTaskQueue, ...rest } = activityOptions; + const startOptions: InternalActivityStartOptions = { + ...rest, + taskQueue: userSpecifiedTaskQueue || taskQueue, + [InternalNexusStartOptionsSymbol]: internalOptions, + }; + + try { + return await client.activity.start(activity, startOptions); + } catch (err) { + // ActivityClient.validateActivityOptions throws TypeError if there are bad options specified. + // e.g. missing startToCloseTimeout or scheduleToCloseTimeout + // If uncaught, causes Nexus operations to retry until timeout or circuit breaker trips and + // reason is not easily discoverable. + if (err instanceof TypeError) { + throw new nexus.HandlerError(nexus.HandlerErrorType.BAD_REQUEST, `Failed to start activity: ${err.message}`, { + cause: err, + }); + } + throw err; + } + }); +} + /** * Module-private brand and payload key for {@link TemporalOperationResult}. */ @@ -218,6 +300,34 @@ export interface TemporalNexusClient { workflowTypeOrFunc: string | T, workflowOptions: WorkflowStartOptions ): Promise>>; + + startActivity(activity: string, options: ActivityOptions): Promise>; + + /** + * Returns this client as a {@link NexusTypedActivityClient}. It enables strong type checking of Activity name, arguments + * and result based on the provided Activity interface. Note that no new client object is created - this method only + * affects type annotations. + * @template T Activity interface to use for type checking. The returned client can only start activities present in + * this interface. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ + typedActivity(): NexusTypedActivityClient; +} + +/** + * Sub-interface of {@link TemporalNexusClient} that provides a strongly-typed interface for executing Activities. + * Argument types in the provided options must match the argument types of the specified Activity as defined in provided + * interface + * @template T Activity interface + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +export interface NexusTypedActivityClient { + startActivity>( + activity: N, + options: ActivityOptionsFor + ): Promise>>; } class TemporalNexusClientImpl implements TemporalNexusClient { @@ -250,6 +360,25 @@ class TemporalNexusClientImpl implements TemporalNexusClient { }); } + public async startActivity( + activity: string, + options: Replace + ): Promise> { + return await this.withAsyncOperationStartReservation(async () => { + const handle = await startActivity(this.startOperationContext, activity, options); + const { namespace } = getHandlerContext(); + + // handle.runId is always populated when starting an activity + // it's safe to use non-null assertion + const token = generateActivityOperationToken(namespace, handle.activityId, handle.runId!); + return TemporalOperationResult.async(token); + }); + } + + public typedActivity(): NexusTypedActivityClient { + return this; + } + private async withAsyncOperationStartReservation(fn: () => Promise): Promise { if (this.asyncOperationStarted) { throw new nexus.HandlerError( @@ -292,6 +421,24 @@ export interface CancelWorkflowRunOptions { readonly workflowId: string; } +/** + * Options passed to a {@link TemporalOperationHandlerOptions.cancelActivity} handler describing + * the activity to cancel. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +export interface CancelActivityOptions { + /** + * The ID of the activity backing the Nexus Operation that is being canceled. + */ + readonly activityId: string; + + /** + * The run ID of the activity backing the Nexus Operation that is being canceled. + */ + readonly runId: string; +} + /** * Options for customizing a {@link TemporalOperationHandler}. * @@ -299,6 +446,7 @@ export interface CancelWorkflowRunOptions { */ export interface TemporalOperationHandlerOptions { cancelWorkflowRun?: (ctx: TemporalCancelOperationContext, options: CancelWorkflowRunOptions) => Promise; + cancelActivity?: (ctx: TemporalCancelOperationContext, options: CancelActivityOptions) => Promise; } /** @@ -309,10 +457,12 @@ export interface TemporalOperationHandlerOptions { export class TemporalOperationHandler implements nexus.OperationHandler { private readonly startHandler: TemporalOperationStartHandler; private readonly cancelWorkflowRunHandler: NonNullable; + private readonly cancelActivityHandler: NonNullable; constructor(options: { start: TemporalOperationStartHandler } & TemporalOperationHandlerOptions) { this.startHandler = options.start; this.cancelWorkflowRunHandler = options.cancelWorkflowRun ?? defaultCancelWorkflowRun; + this.cancelActivityHandler = options.cancelActivity ?? defaultCancelActivity; } async start(ctx: nexus.StartOperationContext, input: I): Promise> { @@ -339,6 +489,16 @@ export class TemporalOperationHandler implements nexus.OperationHandler implements nexus.OperationHandler(), doubleStartOp: nexus.operation(), retryAfterFailedStartOp: nexus.operation(), + echoActivity: nexus.operation(), }); const temporalCancelOpService = nexus.service('temporalCancelOperationService', { @@ -62,6 +64,7 @@ function makeTemporalOpServiceHandler(overrides: Partial (await targetHandle.describe()).status.name === 'CANCELLED', 4000); }); }); + +const activities = { + echo, + throwAnError, +}; + +test('TemporalOperationHandler start typed standalone activity', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { client } = t.context.env; + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + echoActivity: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + return await client.typedActivity().startActivity('echo', { + id: randomUUID(), + args: [input], + scheduleToCloseTimeout: '10s', + }); + }, + }), + }), + ], + }); + + await worker.runUntil(async () => { + const nexusSvc = client.nexus.createServiceClient({ endpoint: endpointName, service: temporalOpService }); + const result = await nexusSvc.executeOperation(temporalOpService.operations.echoActivity, 'foo', { + id: randomUUID(), + }); + t.is(result, 'foo'); + }); +}); + +test('TemporalOperationHandler start untyped standalone activity', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { client } = t.context.env; + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + echoActivity: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + return await client.startActivity('echo', { + id: randomUUID(), + args: [input], + scheduleToCloseTimeout: '10s', + }); + }, + }), + }), + ], + }); + + await worker.runUntil(async () => { + const nexusSvc = client.nexus.createServiceClient({ endpoint: endpointName, service: temporalOpService }); + const result = await nexusSvc.executeOperation(temporalOpService.operations.echoActivity, 'foo', { + id: randomUUID(), + }); + t.is(result, 'foo'); + }); +}); From 39ac1a1bc68142efb3a1777900df76edc566f884 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Tue, 2 Jun 2026 13:30:01 -0700 Subject: [PATCH 09/12] Add some cancel activity tests --- .../test/src/test-nexus-temporal-operation.ts | 106 ++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/packages/test/src/test-nexus-temporal-operation.ts b/packages/test/src/test-nexus-temporal-operation.ts index 2c92cd8faf..e5d95b62fb 100644 --- a/packages/test/src/test-nexus-temporal-operation.ts +++ b/packages/test/src/test-nexus-temporal-operation.ts @@ -15,6 +15,7 @@ import { helpers, makeTestFunction } from './helpers-integration'; import { innermostHandlerError } from './helpers-nexus'; import { waitUntil } from './helpers'; import { echo, throwAnError } from './activities'; +import { Context } from '@temporalio/activity'; const test = makeTestFunction({ workflowsPath: __filename, @@ -41,6 +42,7 @@ const temporalOpService = nexus.service('temporalOperationService', { doubleStartOp: nexus.operation(), retryAfterFailedStartOp: nexus.operation(), echoActivity: nexus.operation(), + blockingActivity: nexus.operation(), }); const temporalCancelOpService = nexus.service('temporalCancelOperationService', { @@ -65,6 +67,7 @@ function makeTemporalOpServiceHandler(overrides: Partial { @@ -485,3 +495,99 @@ test('TemporalOperationHandler start untyped standalone activity', async (t) => t.is(result, 'foo'); }); }); + +test('TemporalOperationHandler cancels backing activity', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { client } = t.context.env; + + let startedActivity = false; + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + blockingActivity: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + const result = await client.typedActivity().startActivity('waitForCancellation', { + id: input, + scheduleToCloseTimeout: '10s', + }); + startedActivity = true; + return result; + }, + }), + }), + ], + }); + + await worker.runUntil(async () => { + const nexusSvc = client.nexus.createServiceClient({ endpoint: endpointName, service: temporalOpService }); + const targetActivityId = `wait-for-cancel-${randomUUID()}`; + const handle = await nexusSvc.startOperation(temporalOpService.operations.blockingActivity, targetActivityId, { + id: randomUUID(), + }); + await waitUntil(async () => startedActivity, 4000); + + const activityHandle = client.activity.getHandle(targetActivityId); + + await handle.cancel(); + + await waitUntil(async () => (await handle.describe()).status === 'CANCELED', 4000); + await waitUntil(async () => (await activityHandle.describe()).status === 'CANCELED', 4000); + + // Assertions built into the waitUntils + t.pass(); + }); +}); + +test('TemporalOperationHandler invokes custom cancel', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { client } = t.context.env; + + let activityStarted = false; + let customCancelCalled = false; + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + blockingActivity: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + const result = await client.typedActivity().startActivity('waitForCancellation', { + id: input, + scheduleToCloseTimeout: '10s', + }); + activityStarted = true; + return result; + }, + async cancelActivity(ctx, { activityId, runId }) { + console.log('activityId', activityId); + console.log('runId', runId); + const handle = temporalnexus.getClient().activity.getHandle(activityId, runId); + await handle.cancel('test custom cancellation'); + customCancelCalled = true; + }, + }), + }), + ], + }); + + await worker.runUntil(async () => { + const nexusSvc = client.nexus.createServiceClient({ endpoint: endpointName, service: temporalOpService }); + const targetActivityId = `wait-for-cancel-${randomUUID()}`; + const result = await nexusSvc.startOperation(temporalOpService.operations.blockingActivity, targetActivityId, { + id: randomUUID(), + }); + + await waitUntil(async () => activityStarted, 4000); + + const activityHandle = client.activity.getHandle(targetActivityId); + await result.cancel(); + + await waitUntil(async () => (await result.describe()).status === 'CANCELED', 4000); + await waitUntil(async () => (await activityHandle.describe()).status === 'CANCELED', 4000); + t.true(customCancelCalled, 'expected custom cancel to be called'); + }); +}); From e33ea816034b16f599f72878ff8270924fa372e9 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Tue, 2 Jun 2026 17:03:20 -0700 Subject: [PATCH 10/12] Expand test coverage --- .../src/__tests__/test-nexus-token-helpers.ts | 34 ++++ .../test/src/test-nexus-temporal-operation.ts | 174 +++++++++++++----- 2 files changed, 158 insertions(+), 50 deletions(-) diff --git a/packages/nexus/src/__tests__/test-nexus-token-helpers.ts b/packages/nexus/src/__tests__/test-nexus-token-helpers.ts index ab25bd3b49..bce7abac61 100644 --- a/packages/nexus/src/__tests__/test-nexus-token-helpers.ts +++ b/packages/nexus/src/__tests__/test-nexus-token-helpers.ts @@ -1,6 +1,8 @@ import test from 'ava'; import { + assertActivityOperationToken, base64URLEncodeNoPadding, + generateActivityOperationToken, generateWorkflowRunOperationToken, loadOperationToken, loadWorkflowRunOperationToken, @@ -17,6 +19,19 @@ test('encode and decode workflow run Operation token', (t) => { t.deepEqual(decoded, expected); }); +test('encode and decode activity Operation token', (t) => { + const expected = { + t: 2, + ns: 'ns', + aid: 'a', + rid: 'r', + }; + const token = generateActivityOperationToken('ns', 'a', 'r'); + const decoded = loadOperationToken(token); + assertActivityOperationToken(decoded); + t.deepEqual(decoded, expected); +}); + test('decode Operation token errors', (t) => { t.throws(() => loadOperationToken(''), { message: /invalid operation token: token is empty/ }); @@ -46,3 +61,22 @@ test('decode workflow run Operation token errors', (t) => { message: /invalid workflow run token: missing workflow ID \(wid\)/, }); }); + +test('decode activity Operation token errors', (t) => { + const missingAIDToken = base64URLEncodeNoPadding('{"t":2,"ns":"ns","rid":"r"}'); + t.throws(() => assertActivityOperationToken(loadOperationToken(missingAIDToken)), { + message: /invalid activity token: missing activity ID \(aid\)/, + }); + + const missingRIDToken = base64URLEncodeNoPadding('{"t":2,"ns":"ns","aid":"a"}'); + t.throws(() => assertActivityOperationToken(loadOperationToken(missingRIDToken)), { + message: /invalid activity token: missing activity run ID \(rid\)/, + }); +}); + +test('loadWorkflowRunOperationToken rejects activity token', (t) => { + const activityToken = generateActivityOperationToken('ns', 'a', 'r'); + t.throws(() => loadWorkflowRunOperationToken(activityToken), { + message: /invalid workflow token type: 2, expected: 1/, + }); +}); diff --git a/packages/test/src/test-nexus-temporal-operation.ts b/packages/test/src/test-nexus-temporal-operation.ts index e5d95b62fb..6170641a9c 100644 --- a/packages/test/src/test-nexus-temporal-operation.ts +++ b/packages/test/src/test-nexus-temporal-operation.ts @@ -9,13 +9,13 @@ import { } from '@temporalio/client'; import * as temporalnexus from '@temporalio/nexus'; import { asyncLocalStorage } from '@temporalio/nexus/lib/context'; -import { base64URLEncodeNoPadding } from '@temporalio/nexus/lib/token'; +import { base64URLEncodeNoPadding, OperationTokenType } from '@temporalio/nexus/lib/token'; import * as workflow from '@temporalio/workflow'; +import { Context } from '@temporalio/activity'; import { helpers, makeTestFunction } from './helpers-integration'; import { innermostHandlerError } from './helpers-nexus'; import { waitUntil } from './helpers'; import { echo, throwAnError } from './activities'; -import { Context } from '@temporalio/activity'; const test = makeTestFunction({ workflowsPath: __filename, @@ -118,6 +118,21 @@ export async function blockingTargetWorkflow(): Promise { await workflow.condition(() => false); } +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Activities + +const activities = { + echo, + throwAnError, + async waitForCancellation() { + const cx = Context.current(); + while (true) { + await cx.sleep(300); + await cx.heartbeat(); + } + }, +}; + //////////////////////////////////////////////////////////////////////////////////////////////////// // Tests @@ -256,6 +271,50 @@ test('TemporalOperationHandler.cancel rejects invalid operation token type befor t.regex(err?.message ?? '', /invalid operation token/); }); +test('TemporalOperationHandler.cancel rejects malformed activity token before invoking cancelActivity', async (t) => { + let cancelActivityCalled = false; + const handler = new temporalnexus.TemporalOperationHandler({ + async start(_ctx, _client, _input) { + return temporalnexus.TemporalOperationResult.sync(undefined); + }, + + async cancelActivity(_ctx, _options) { + cancelActivityCalled = true; + throw new Error('cancelActivity should not be called'); + }, + }); + const token = base64URLEncodeNoPadding(JSON.stringify({ t: OperationTokenType.ACTIVITY, ns: 'test-namespace' })); + + const err = await asyncLocalStorage.run( + { + client: undefined as any, + endpoint: 'test-endpoint', + namespace: 'test-namespace', + taskQueue: 'test-task-queue', + log: undefined as any, + metrics: undefined as any, + }, + async () => { + return await t.throwsAsync( + handler.cancel( + { + abortSignal: new AbortController().signal, + headers: {}, + operation: 'operation', + service: 'service', + }, + token + ) + ); + } + ); + + assert(err instanceof nexus.HandlerError); + t.is(err.type, 'BAD_REQUEST'); + t.regex(err.message, /invalid activity operation token/); + t.false(cancelActivityCalled, 'cancelActivity must not be invoked for a malformed activity token'); +}); + test('TemporalOperationHandler async and sync happy paths - caller workflow', async (t) => { const { createWorker, executeWorkflow, registerNexusEndpoint } = helpers(t); const { endpointName } = await registerNexusEndpoint(); @@ -293,47 +352,74 @@ test('TemporalOperationHandler async and sync happy paths - caller workflow', as }); }); -test('TemporalOperationHandler rejects multiple async starts', async (t) => { - const { createWorker, executeWorkflow, registerNexusEndpoint } = helpers(t); - const { endpointName } = await registerNexusEndpoint(); +// A single backing-operation start, as invoked from inside a start handler. The result is +// awaited then discarded because the parameterized guard test always throws afterwards. +type StartAction = (client: temporalnexus.TemporalNexusClient, input: string) => Promise; - const worker = await createWorker({ - nexusServices: [ - makeTemporalOpServiceHandler({ - doubleStartOp: new temporalnexus.TemporalOperationHandler({ - async start(_ctx, client, input) { - await client.startWorkflow(echoWorkflow, { - workflowId: randomUUID(), - args: [input], - }); - await client.startWorkflow(echoWorkflow, { - workflowId: randomUUID(), - args: [input], - }); - throw new nexus.HandlerError('INTERNAL', 'expected previous error to be thrown'); - }, - }), - }), - ], +const startWorkflowAction: StartAction = (client, input) => + client.startWorkflow(echoWorkflow, { + workflowId: randomUUID(), + args: [input], }); - await worker.runUntil(async () => { - const err = await t.throwsAsync( - () => - executeWorkflow(temporalDoubleStartOpCaller, { - args: [endpointName], +const startActivityAction: StartAction = (client, input) => + client.startActivity('echo', { + id: randomUUID(), + args: [input], + scheduleToCloseTimeout: '10s', + }); + +// The shared multiple-async-start guard (withAsyncOperationStartReservation) must reject a +// second backing-operation start regardless of which start kinds are combined. Each case's +// `name` becomes part of the test title and the derived Nexus endpoint name, so it must use +// only characters the endpoint-name transform sanitizes (letters/digits/spaces/parens/hyphens); +// avoid '+', '&', etc., which leak through and fail endpoint registration. Add a row to cover a +// new combination, or a new StartAction const to cover a new start kind. +const multipleAsyncStartCases: { name: string; first: StartAction; second: StartAction }[] = [ + { name: 'workflow then workflow', first: startWorkflowAction, second: startWorkflowAction }, + { name: 'activity then activity', first: startActivityAction, second: startActivityAction }, + { name: 'workflow then activity', first: startWorkflowAction, second: startActivityAction }, + { name: 'activity then workflow', first: startActivityAction, second: startWorkflowAction }, +]; + +for (const { name, first, second } of multipleAsyncStartCases) { + test(`TemporalOperationHandler rejects multiple async starts (${name})`, async (t) => { + const { createWorker, executeWorkflow, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + doubleStartOp: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + await first(client, input); + await second(client, input); // guard trips here + throw new nexus.HandlerError('INTERNAL', 'expected previous error to be thrown'); + }, + }), }), - { - instanceOf: WorkflowFailedError, - } - ); - assert(err?.cause instanceof NexusOperationFailure); - assert(err.cause.cause instanceof nexus.HandlerError); - const inner = innermostHandlerError(err.cause.cause); - t.is(inner.type, 'BAD_REQUEST'); - t.regex(inner.message, /Only one async operation can be started per operation handler invocation/); + ], + }); + + await worker.runUntil(async () => { + const err = await t.throwsAsync( + () => + executeWorkflow(temporalDoubleStartOpCaller, { + args: [endpointName], + }), + { + instanceOf: WorkflowFailedError, + } + ); + assert(err?.cause instanceof NexusOperationFailure); + assert(err.cause.cause instanceof nexus.HandlerError); + const inner = innermostHandlerError(err.cause.cause); + t.is(inner.type, 'BAD_REQUEST'); + t.regex(inner.message, /Only one async operation can be started per operation handler invocation/); + }); }); -}); +} test('TemporalOperationHandler allows retry after failed async start', async (t) => { const { createWorker, executeWorkflow, startWorkflow, registerNexusEndpoint } = helpers(t); @@ -422,18 +508,6 @@ test('TemporalOperationHandler default cancelWorkflowRun cancels backing workflo }); }); -const activities = { - echo, - throwAnError, - async waitForCancellation() { - const cx = Context.current(); - while (true) { - await cx.sleep(300); - await cx.heartbeat(); - } - }, -}; - test('TemporalOperationHandler start typed standalone activity', async (t) => { const { createWorker, registerNexusEndpoint } = helpers(t); const { endpointName } = await registerNexusEndpoint(); From 146f19dca6c5ea773569b112bb066d596466cbef Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 4 Jun 2026 17:58:17 -0700 Subject: [PATCH 11/12] expand test coverage --- .../test/src/test-nexus-temporal-operation.ts | 144 +++++++++++++++++- 1 file changed, 141 insertions(+), 3 deletions(-) diff --git a/packages/test/src/test-nexus-temporal-operation.ts b/packages/test/src/test-nexus-temporal-operation.ts index 02c1dc7c28..f5220c6c5b 100644 --- a/packages/test/src/test-nexus-temporal-operation.ts +++ b/packages/test/src/test-nexus-temporal-operation.ts @@ -1,9 +1,10 @@ import assert from 'assert'; import { randomUUID } from 'crypto'; import * as nexus from 'nexus-rpc'; -import { NexusOperationFailure } from '@temporalio/common'; +import { ApplicationFailure, NexusOperationFailure } from '@temporalio/common'; import { NexusOperationExecutionStatus, + NexusOperationFailureError, WorkflowExecutionAlreadyStartedError, WorkflowFailedError, } from '@temporalio/client'; @@ -47,6 +48,7 @@ const temporalOpService = nexus.service('temporalOperationService', { doubleStartOp: nexus.operation(), retryAfterFailedStartOp: nexus.operation(), echoActivity: nexus.operation(), + failingActivity: nexus.operation(), blockingActivity: nexus.operation(), }); @@ -72,6 +74,7 @@ function makeTemporalOpServiceHandler(overrides: Partial { + // echo(message?: string): Promise, so typedActivity().startActivity('echo', ...) + // resolves to TemporalOperationResult and the operation output type infers as string. + const _activityStringOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input: string) { + return await client.typedActivity().startActivity('echo', { + id: 'test', + args: [input], + scheduleToCloseTimeout: '10s', + }); + }, + }); + + // @ts-expect-error - Output type should be string (echo returns string), not number + const _activityMismatchedOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input: string) { + return await client.typedActivity().startActivity('echo', { + id: 'test', + args: [input], + scheduleToCloseTimeout: '10s', + }); + }, + }); + + const _activityWrongArgsOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, _input: string) { + return await client.typedActivity().startActivity('echo', { + id: 'test', + // @ts-expect-error - echo expects a string argument, not a number + args: [42], + scheduleToCloseTimeout: '10s', + }); + }, + }); + + const _explicitActivityStringOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler< + string, + string + >({ + async start(_ctx, client, input) { + return await client.typedActivity().startActivity('echo', { + id: 'test', + args: [input], + scheduleToCloseTimeout: '10s', + }); + }, + }); + + // This test only checks for compile-time errors. + t.pass(); +}); + test('TemporalOperationHandler cancel delegates to provided cancelWorkflowRun handler', async (t) => { const { createWorker, registerNexusEndpoint } = helpers(t); const { endpointName } = await registerNexusEndpoint(); @@ -658,6 +713,91 @@ test('TemporalOperationHandler start untyped standalone activity', async (t) => }); }); +test('TemporalOperationHandler converts invalid activity options to BAD_REQUEST', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { client } = t.context.env; + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + echoActivity: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + // Intentionally omit scheduleToCloseTimeout/startToCloseTimeout. Activity option + // validation throws a TypeError, which startActivity converts into a BAD_REQUEST + // HandlerError so the caller gets an actionable, non-retryable error instead of the + // operation silently retrying until it times out. + return await client.startActivity('echo', { + id: randomUUID(), + args: [input], + }); + }, + }), + }), + ], + }); + + await worker.runUntil(async () => { + const nexusSvc = client.nexus.createServiceClient({ endpoint: endpointName, service: temporalOpService }); + const err = await t.throwsAsync( + nexusSvc.executeOperation(temporalOpService.operations.echoActivity, 'foo', { + id: randomUUID(), + }), + { instanceOf: NexusOperationFailureError } + ); + assert(err?.cause instanceof nexus.HandlerError); + const inner = innermostHandlerError(err.cause); + t.is(inner.type, 'BAD_REQUEST'); + t.regex( + inner.message, + /Failed to start activity: Either scheduleToCloseTimeout or startToCloseTimeout is required/ + ); + }); +}); + +test('TemporalOperationHandler propagates backing activity failure', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { client } = t.context.env; + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + failingActivity: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, message) { + // throwAnError(true, message) throws a non-retryable ApplicationFailure, so the + // backing activity fails permanently and the failure propagates to the Nexus caller. + return await client.typedActivity().startActivity('throwAnError', { + id: randomUUID(), + args: [true, message], + scheduleToCloseTimeout: '10s', + }); + }, + }), + }), + ], + }); + + await worker.runUntil(async () => { + const nexusSvc = client.nexus.createServiceClient({ endpoint: endpointName, service: temporalOpService }); + const err = await t.throwsAsync( + nexusSvc.executeOperation(temporalOpService.operations.failingActivity, 'activity failed', { + id: randomUUID(), + }), + { instanceOf: NexusOperationFailureError } + ); + + assert(err?.cause instanceof ApplicationFailure); + const activityFailure = err.cause.cause; + assert(activityFailure instanceof ApplicationFailure); + t.is(activityFailure.message, 'activity failed'); + t.is(activityFailure.type, 'Error'); + t.true(activityFailure.nonRetryable); + }); +}); + test('TemporalOperationHandler cancels backing activity', async (t) => { const { createWorker, registerNexusEndpoint } = helpers(t); const { endpointName } = await registerNexusEndpoint(); @@ -725,8 +865,6 @@ test('TemporalOperationHandler invokes custom cancelActivity', async (t) => { return result; }, async cancelActivity(ctx, { activityId, runId }) { - console.log('activityId', activityId); - console.log('runId', runId); const handle = temporalnexus.getClient().activity.getHandle(activityId, runId); await handle.cancel('test custom cancellation'); customCancelCalled = true; From d4144a7714babe172f48885caf407b2cdfc71364 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 11 Jun 2026 16:45:23 -0700 Subject: [PATCH 12/12] Address PR feedback --- packages/nexus/src/token.ts | 6 +++--- packages/nexus/src/workflow-helpers.ts | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/nexus/src/token.ts b/packages/nexus/src/token.ts index cd68a93a6b..c9b7ed6e57 100644 --- a/packages/nexus/src/token.ts +++ b/packages/nexus/src/token.ts @@ -103,7 +103,7 @@ export function generateActivityOperationToken(namespace: string, activityId: st } /** - * Encode an OPerationToken as a string. + * Encode an OperationToken as a string. */ export function encodeOperationToken(token: OperationToken): string { return base64URLEncodeNoPadding(JSON.stringify(token)); @@ -179,10 +179,10 @@ export function assertActivityOperationToken(token: OperationToken): asserts tok throw new TypeError(`invalid activity token type: ${token.t}, expected: ${OperationTokenType.ACTIVITY}`); } if (!token.aid || typeof token.aid !== 'string') { - throw new TypeError('invalid activity token: missing activity ID (aid)'); + throw new TypeError('invalid activity token: missing or invalid activity ID (aid)'); } if (!token.rid || typeof token.rid !== 'string') { - throw new TypeError('invalid activity token: missing activity run ID (rid)'); + throw new TypeError('invalid activity token: missing or invalid activity run ID (rid)'); } } diff --git a/packages/nexus/src/workflow-helpers.ts b/packages/nexus/src/workflow-helpers.ts index 35fd4872b0..b224b5f505 100644 --- a/packages/nexus/src/workflow-helpers.ts +++ b/packages/nexus/src/workflow-helpers.ts @@ -148,8 +148,8 @@ export async function startWorkflow( workflowTypeOrFunc: string | T, workflowOptions: WorkflowStartOptions ): Promise>> { - const { client, taskQueue } = getHandlerContext(); - const token = generateWorkflowRunOperationToken(client.options.namespace, workflowOptions.workflowId); + const { client, taskQueue, namespace } = getHandlerContext(); + const token = generateWorkflowRunOperationToken(namespace, workflowOptions.workflowId); return await startWithNexusOptions(ctx, token, async (internalOptions) => { const { taskQueue: userSpecifiedTaskQueue, ...rest } = workflowOptions; const startOptions: ClientWorkflowStartOptions = { @@ -230,14 +230,14 @@ async function startActivity( activity: string, activityOptions: ActivityOptions ): Promise> { - const { client, taskQueue } = getHandlerContext(); + const { client, taskQueue, namespace } = getHandlerContext(); // Activity tokens included in nexus callback headers cannot have // run ID as it's unknown until the activity has been started so we // manually construct the token. const token = encodeOperationToken({ t: OperationTokenType.ACTIVITY, - ns: client.options.namespace, + ns: namespace, aid: activityOptions.id, });