diff --git a/packages/client/src/activity-client.ts b/packages/client/src/activity-client.ts index 7bc83b26e..e295eb18e 100644 --- a/packages/client/src/activity-client.ts +++ b/packages/client/src/activity-client.ts @@ -1,14 +1,6 @@ import { randomUUID } from 'node:crypto'; import { status as grpcStatus } from '@grpc/grpc-js'; -import type { - ActivityFunction, - LoadedDataConverter, - Next, - Priority, - RetryPolicy, - SearchAttributePair, - TypedSearchAttributes, -} from '@temporalio/common'; +import type { LoadedDataConverter, Next } from '@temporalio/common'; import { compilePriority, compileRetryPolicy, @@ -16,7 +8,6 @@ import { decodePriority, decompileRetryPolicy, } from '@temporalio/common'; -import type { Duration } from '@temporalio/common/lib/time'; import { msOptionalToTs, msToNumber, optionalTsToDate, optionalTsToMs } from '@temporalio/common/lib/time'; import { composeInterceptors } from '@temporalio/common/lib/interceptors'; import { @@ -32,7 +23,7 @@ import { encodeUserMetadata, } from '@temporalio/common/lib/internal-non-workflow'; import { temporal } from '@temporalio/proto'; -import type { Replace } from '@temporalio/common/lib/type-helpers'; +import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; import type { ActivityCancelInput, ActivityClientInterceptor, @@ -45,13 +36,7 @@ import type { } from './interceptors'; import type { AsyncCompletionClientOptions } from './async-completion-client'; import { AsyncCompletionClient } from './async-completion-client'; -import type { - ActivityExecutionDescription, - ActivityExecutionInfo, - ActivityIdConflictPolicy, - ActivityIdReusePolicy, - CountActivityExecutions, -} from './types'; +import type { ActivityExecutionDescription, ActivityExecutionInfo, CountActivityExecutions } from './types'; import { decodeActivityExecutionStatus, decodePendingActivityState, @@ -67,6 +52,8 @@ import { ActivityExecutionFailedError, ActivityExecutionAlreadyStartedError, } from './errors'; +import { type InternalActivityStartOptions, InternalNexusStartOptionsSymbol } from './internal'; +import type { ActivityOptions, ActivityOptionsFor, ActivityResult, ActivityName } from './activity-options'; /** * Options used to configure {@link ActivityClient} @@ -256,10 +243,17 @@ export class ActivityClient extends AsyncCompletionClient implements TypedActivi } validateActivityOptions(input.options); + const internalOptions = (input.options as InternalActivityStartOptions)[InternalNexusStartOptionsSymbol]; + try { const resp = await this.workflowService.startActivityExecution( await this.buildStartActivityExecutionRequest(input) ); + + if (internalOptions != null) { + internalOptions.backLink = resp.link ?? undefined; + } + return this.createHandle(input.options.id, resp.runId); } catch (err) { if (isGrpcServiceError(err) && err.code === grpcStatus.ALREADY_EXISTS) { @@ -290,6 +284,8 @@ export class ActivityClient extends AsyncCompletionClient implements TypedActivi ? { indexedFields: encodeUnifiedSearchAttributes(undefined, input.options.typedSearchAttributes) } : undefined; + const internalOptions = (input.options as InternalActivityStartOptions)[InternalNexusStartOptionsSymbol]; + return { namespace: this.options.namespace, identity: this.options.identity, @@ -310,6 +306,7 @@ export class ActivityClient extends AsyncCompletionClient implements TypedActivi userMetadata: await encodeUserMetadata(this.dataConverter, input.options.summary, undefined), priority: input.options.priority ? compilePriority(input.options.priority) : undefined, startDelay: msOptionalToTs(input.options.startDelay), + ...filterNullAndUndefined(internalOptions ?? {}), }; } @@ -362,7 +359,7 @@ export class ActivityClient extends AsyncCompletionClient implements TypedActivi activityId: input.activityId, runId: input.activityRunId || undefined, }); - return buildActivityDescription(resp.info!, this.dataConverter); + return buildActivityDescription(resp.info!, resp.callbacks, this.dataConverter); } catch (err) { this.rethrowGrpcError(err, 'Failed to describe activity'); } @@ -495,77 +492,6 @@ export interface ActivityHandle { terminate(reason: string): Promise; } -/** - * Options used by {@link ActivityClient.start}. - * - * @experimental Standalone Activities are experimental. APIs may be subject to change. - */ -export interface ActivityOptions { - /** - * Activity ID of the started activity. It's recommended to use a meaningful business ID. - */ - id: string; - /** - * Task queue to run this activity on. - */ - taskQueue: string; - /** - * Input arguments to pass to the activity. - */ - args?: any[] | Readonly; - /** - * If set, specifies maximum time between successful heartbeats. - */ - heartbeatTimeout?: Duration; - /** - * Controls how Activity is retried. If not set, the server will assign default retry policy. - */ - retry?: RetryPolicy; - /** - * Is set, specifies total time the activity is allowed to run, including retries. - * - * Note: it is required to set at least one of {@link startToCloseTimeout} and {@link scheduleToCloseTimeout}. - */ - startToCloseTimeout?: Duration; - /** - * If set, specifies maximum time the activity can wait in the task queue before being picked up by a worker. - * This timeout is non-retryable. - */ - scheduleToStartTimeout?: Duration; - /** - * If set, specifies maximum time for a single execution attempt. This timeout is retryable. - * - * Note: it is required to set at least one of {@link startToCloseTimeout} and {@link scheduleToCloseTimeout}. - */ - scheduleToCloseTimeout?: Duration; - /** - * A single-line fixed summary for this activity execution that may appear in UI/CLI. - * This can be in single-line Temporal markdown format. - */ - summary?: string; - /** - * Priority to use when starting this activity. - */ - priority?: Priority; - /** - * Time to wait before dispatching the first activity task. This delay is not applied to retry attempts. - */ - startDelay?: Duration; - /** - * Specifies behavior if there's a *closed* activity with the same ID. - */ - idReusePolicy?: ActivityIdReusePolicy; - /** - * Specifies behavior if there's a *running* activity with the same ID. Note that there can only be one running - * Activity for each Activity ID. - */ - idConflictPolicy?: ActivityIdConflictPolicy; - /** - * Search attributes for the activity. - */ - typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes; -} - function validateActivityOptions(options: ActivityOptions): void { if (!options.id) { throw new TypeError('id is required'); @@ -606,6 +532,7 @@ function buildActivityExecutionInfo(info: temporal.api.activity.v1.IActivityExec function buildActivityDescription( info: temporal.api.activity.v1.IActivityExecutionInfo, + callbacks: temporal.api.activity.v1.ICallbackInfo[], dataConverter: LoadedDataConverter ): ActivityExecutionDescription { const getHeartbeatDetails: () => Promise = async () => { @@ -624,6 +551,7 @@ function buildActivityDescription( return { ...buildActivityExecutionInfoCommonPart(info), rawInfo: info, + rawCallbacks: callbacks, runState: decodePendingActivityState(info.runState), scheduleToCloseTimeoutMs: optionalTsToMs(info.scheduleToCloseTimeout), scheduleToStartTimeoutMs: optionalTsToMs(info.scheduleToStartTimeout), @@ -663,71 +591,3 @@ export interface TypedActivityClient { execute>(activity: N, options: ActivityOptionsFor): Promise>; } - -/** - * Utility type to support strong typing in {@link TypedActivityClient}. - * Contains names of activities extracted from the specified activity interface. - * @template T Activity interface - * - * @experimental Standalone Activities are experimental. APIs may be subject to change. - */ -export type ActivityName = { - [N in keyof T & string]: T[N] extends ActivityFunction ? N : never; -}[keyof T & string]; - -/** - * Utility type to support strong typing in {@link TypedActivityClient}. - * Extracts argument types of an activity. - * @template T Activity interface - * @template N Activity name - * - * @experimental Standalone Activities are experimental. APIs may be subject to change. - */ -export type ActivityArgs> = T[N] extends ActivityFunction ? P : never; - -/** - * Utility type to support strong typing in {@link TypedActivityClient}. - * Extracts result type of an activity. - * @template T Activity interface - * @template N Activity name - * - * @experimental Standalone Activities are experimental. APIs may be subject to change. - */ -export type ActivityResult> = T[N] extends ActivityFunction ? R : never; - -/** - * Utility type to support strong typing in {@link TypedActivityClient}. - * Represents {@link ActivityOptions} with strongly typed arguments. - * @template Args Types of activity arguments as an array type. - * - * @experimental Standalone Activities are experimental. APIs may be subject to change. - */ -export type ActivityOptionsWithArgs = Args extends [any, ...any] - ? Replace< - ActivityOptions, - { - /** - * Arguments to pass to the Activity - */ - args: Args | Readonly; - } - > - : Replace< - ActivityOptions, - { - /** - * Arguments to pass to the Activity - */ - args?: Args | Readonly; - } - >; - -/** - * Utility type to support strong typing in {@link TypedActivityClient}. - * Represents {@link ActivityOptions} with strongly typed arguments matching specified Activity in specified interface. - * @template T Activity interface - * @template N Activity name - * - * @experimental Standalone Activities are experimental. APIs may be subject to change. - */ -export type ActivityOptionsFor> = ActivityOptionsWithArgs>; diff --git a/packages/client/src/activity-options.ts b/packages/client/src/activity-options.ts new file mode 100644 index 000000000..d1b5203ca --- /dev/null +++ b/packages/client/src/activity-options.ts @@ -0,0 +1,149 @@ +import type { + ActivityFunction, + Priority, + RetryPolicy, + SearchAttributePair, + TypedSearchAttributes, +} from '@temporalio/common'; +import type { Duration } from '@temporalio/common/lib/time'; +import type { Replace } from '@temporalio/common/lib/type-helpers'; +import type { ActivityIdConflictPolicy, ActivityIdReusePolicy } from './types'; + +/** + * Options used by {@link ActivityClient.start}. + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export interface ActivityOptions { + /** + * Activity ID of the started activity. It's recommended to use a meaningful business ID. + */ + id: string; + /** + * Task queue to run this activity on. + */ + taskQueue: string; + /** + * Input arguments to pass to the activity. + */ + args?: any[] | Readonly; + /** + * If set, specifies maximum time between successful heartbeats. + */ + heartbeatTimeout?: Duration; + /** + * Controls how Activity is retried. If not set, the server will assign default retry policy. + */ + retry?: RetryPolicy; + /** + * Is set, specifies total time the activity is allowed to run, including retries. + * + * Note: it is required to set at least one of {@link startToCloseTimeout} and {@link scheduleToCloseTimeout}. + */ + startToCloseTimeout?: Duration; + /** + * If set, specifies maximum time the activity can wait in the task queue before being picked up by a worker. + * This timeout is non-retryable. + */ + scheduleToStartTimeout?: Duration; + /** + * If set, specifies maximum time for a single execution attempt. This timeout is retryable. + * + * Note: it is required to set at least one of {@link startToCloseTimeout} and {@link scheduleToCloseTimeout}. + */ + scheduleToCloseTimeout?: Duration; + /** + * A single-line fixed summary for this activity execution that may appear in UI/CLI. + * This can be in single-line Temporal markdown format. + */ + summary?: string; + /** + * Priority to use when starting this activity. + */ + priority?: Priority; + /** + * Time to wait before dispatching the first activity task. This delay is not applied to retry attempts. + */ + startDelay?: Duration; + /** + * Specifies behavior if there's a *closed* activity with the same ID. + */ + idReusePolicy?: ActivityIdReusePolicy; + /** + * Specifies behavior if there's a *running* activity with the same ID. Note that there can only be one running + * Activity for each Activity ID. + */ + idConflictPolicy?: ActivityIdConflictPolicy; + /** + * Search attributes for the activity. + */ + typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes; +} + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Contains names of activities extracted from the specified activity interface. + * @template T Activity interface + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityName = { + [N in keyof T & string]: T[N] extends ActivityFunction ? N : never; +}[keyof T & string]; + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Extracts argument types of an activity. + * @template T Activity interface + * @template N Activity name + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityArgs> = T[N] extends ActivityFunction ? P : never; + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Extracts result type of an activity. + * @template T Activity interface + * @template N Activity name + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityResult> = T[N] extends ActivityFunction ? R : never; + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Represents {@link ActivityOptions} with strongly typed arguments. + * @template Args Types of activity arguments as an array type. + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityOptionsWithArgs = Args extends [any, ...any] + ? Replace< + ActivityOptions, + { + /** + * Arguments to pass to the Activity + */ + args: Args | Readonly; + } + > + : Replace< + ActivityOptions, + { + /** + * Arguments to pass to the Activity + */ + args?: Args | Readonly; + } + >; + +/** + * Utility type to support strong typing in {@link TypedActivityClient}. + * Represents {@link ActivityOptions} with strongly typed arguments matching specified Activity in specified interface. + * @template T Activity interface + * @template N Activity name + * + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type ActivityOptionsFor> = ActivityOptionsWithArgs>; diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index 40e0fef99..e8c9857ab 100644 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -55,6 +55,7 @@ export * from '@temporalio/common/lib/interfaces'; export * from '@temporalio/common/lib/workflow-handle'; export * from './async-completion-client'; export * from './activity-client'; +export * from './activity-options'; export * from './client'; export { Connection, diff --git a/packages/client/src/interceptors.ts b/packages/client/src/interceptors.ts index eae5e0366..7f4d2a5f4 100644 --- a/packages/client/src/interceptors.ts +++ b/packages/client/src/interceptors.ts @@ -26,7 +26,8 @@ import type { WorkflowExecution, } from './types'; import type { CompiledWorkflowOptions, WorkflowUpdateOptions } from './workflow-options'; -import type { ActivityHandle, ActivityOptions } from './activity-client'; +import type { ActivityHandle } from './activity-client'; +import type { ActivityOptions } from './activity-options'; export { Headers, Next }; diff --git a/packages/client/src/internal.ts b/packages/client/src/internal.ts index d981e6c04..bd46a8ba0 100644 --- a/packages/client/src/internal.ts +++ b/packages/client/src/internal.ts @@ -1,36 +1,37 @@ import type { temporal } from '@temporalio/proto'; import type { WorkflowOptions } from './workflow-options'; +import type { ActivityOptions } from './activity-options'; /** - * A symbol used to attach extra, SDK-internal options to the `WorkflowClient.start()` call. + * A symbol used to attach extra, SDK-internal options to the `WorkflowClient.start()` + * or `ActivityClient.start()` call. * * These are notably used by the Temporal Nexus helpers. * * @internal * @hidden */ -export const InternalWorkflowStartOptionsSymbol = Symbol.for('__temporal_internal_client_workflow_start_options'); -export interface InternalWorkflowStartOptions extends WorkflowOptions { - [InternalWorkflowStartOptionsSymbol]?: { +export const InternalNexusStartOptionsSymbol = Symbol.for('__temporal_internal_client_nexus_start_options'); +export interface InternalNexusStartOptions { + [InternalNexusStartOptionsSymbol]?: { /** - * Request ID to be used for the workflow. + * Request ID to be used for the start call. */ requestId?: string; /** * Callbacks to be called by the server when this workflow reaches a terminal state. - * If the workflow continues-as-new, these callbacks will be carried over to the new execution. * Callback addresses must be whitelisted in the server's dynamic configuration. */ completionCallbacks?: temporal.api.common.v1.ICallback[]; /** - * Links to be associated with the workflow. + * Links to be associated with the workflow or activity execution. */ links?: temporal.api.common.v1.ILink[]; /** - * Backlink copied by the client from the StartWorkflowExecutionResponse. + * Backlink copied by the client from the start response. * Only populated in servers newer than 1.27. */ backLink?: temporal.api.common.v1.ILink; @@ -38,8 +39,11 @@ export interface InternalWorkflowStartOptions extends WorkflowOptions { /** * Conflict options for when USE_EXISTING is specified. * - * Used by the Nexus WorkflowRunOperations to attach to a callback to a running workflow. + * Used by the Nexus Operations to attach to a callback to a running execution (workflow or activity). */ onConflictOptions?: temporal.api.workflow.v1.IOnConflictOptions; }; } + +export type InternalWorkflowStartOptions = WorkflowOptions & InternalNexusStartOptions; +export type InternalActivityStartOptions = ActivityOptions & InternalNexusStartOptions; diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index 0b94a76db..b87ecaea2 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -244,6 +244,11 @@ export type RawActivityExecutionInfo = proto.temporal.api.activity.v1.IActivityE */ export type RawActivityExecutionListInfo = proto.temporal.api.activity.v1.IActivityExecutionListInfo; +/** + * @experimental Standalone Activities are experimental. APIs may be subject to change. + */ +export type RawActivityExecutionCallbacks = proto.temporal.api.activity.v1.ICallbackInfo[]; + /** * Type of elements returned by {@link ActivityClient.list} * @@ -268,6 +273,7 @@ export interface ActivityExecutionInfo { * @experimental Standalone Activities are experimental. APIs may be subject to change. */ export interface ActivityExecutionDescription extends ActivityExecutionInfo { + rawCallbacks: RawActivityExecutionCallbacks; rawInfo: RawActivityExecutionInfo; runState?: PendingActivityState; scheduleToCloseTimeoutMs?: number; diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 6e93602d4..74309df59 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -94,7 +94,7 @@ import { BaseClient, defaultBaseClientOptions } from './base-client'; import { mapAsyncIterable } from './iterators-utils'; import { WorkflowUpdateStage, encodeWorkflowUpdateStage } from './workflow-update-stage'; import type { InternalWorkflowStartOptions } from './internal'; -import { InternalWorkflowStartOptionsSymbol } from './internal'; +import { InternalNexusStartOptionsSymbol } from './internal'; import { adaptWorkflowClientInterceptor } from './interceptor-adapters'; const UpdateWorkflowExecutionLifecycleStage = temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage; @@ -1293,7 +1293,7 @@ export class WorkflowClient extends BaseClient { protected async _startWorkflowHandler(input: WorkflowStartInput): Promise { const req = await this.createStartWorkflowRequest(input); const { options: opts, workflowType } = input; - const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol]; + const internalOptions = (opts as InternalWorkflowStartOptions)[InternalNexusStartOptionsSymbol]; try { const response = await this.workflowService.startWorkflowExecution(req); if (internalOptions != null) { @@ -1320,7 +1320,7 @@ export class WorkflowClient extends BaseClient { const { identity, namespace } = this.options; const dataConverter = this.dataConverter; const context = this.workflowSerializationContext(opts.workflowId); - const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol]; + const internalOptions = (opts as InternalWorkflowStartOptions)[InternalNexusStartOptionsSymbol]; const supportsEagerStart = (this.connection as InternalConnectionLike)?.[InternalConnectionLikeSymbol] ?.supportsEagerStart; diff --git a/packages/nexus/src/__tests__/test-nexus-token-helpers.ts b/packages/nexus/src/__tests__/test-nexus-token-helpers.ts index 166280681..bce7abac6 100644 --- a/packages/nexus/src/__tests__/test-nexus-token-helpers.ts +++ b/packages/nexus/src/__tests__/test-nexus-token-helpers.ts @@ -1,6 +1,8 @@ import test from 'ava'; import { + assertActivityOperationToken, base64URLEncodeNoPadding, + generateActivityOperationToken, generateWorkflowRunOperationToken, loadOperationToken, loadWorkflowRunOperationToken, @@ -17,6 +19,19 @@ test('encode and decode workflow run Operation token', (t) => { t.deepEqual(decoded, expected); }); +test('encode and decode activity Operation token', (t) => { + const expected = { + t: 2, + ns: 'ns', + aid: 'a', + rid: 'r', + }; + const token = generateActivityOperationToken('ns', 'a', 'r'); + const decoded = loadOperationToken(token); + assertActivityOperationToken(decoded); + t.deepEqual(decoded, expected); +}); + test('decode Operation token errors', (t) => { t.throws(() => loadOperationToken(''), { message: /invalid operation token: token is empty/ }); @@ -34,12 +49,11 @@ test('decode Operation token errors', (t) => { }); test('decode workflow run Operation token errors', (t) => { - const invalidTypeToken = base64URLEncodeNoPadding('{"t":2,"ns":"ns"}'); + const invalidTypeToken = base64URLEncodeNoPadding('{"t":99,"ns":"ns"}'); t.throws(() => loadWorkflowRunOperationToken(invalidTypeToken), { // This currently fails on unknown token type as there are no other existing token types. // When new token types are added this regex will need to be updated to - // /invalid workflow token type: 2/ - message: /invalid operation token: unknown token type: 2/, + message: /invalid operation token: unknown token type: 99/, }); const missingWIDToken = base64URLEncodeNoPadding('{"t":1,"ns":"ns"}'); @@ -47,3 +61,22 @@ test('decode workflow run Operation token errors', (t) => { message: /invalid workflow run token: missing workflow ID \(wid\)/, }); }); + +test('decode activity Operation token errors', (t) => { + const missingAIDToken = base64URLEncodeNoPadding('{"t":2,"ns":"ns","rid":"r"}'); + t.throws(() => assertActivityOperationToken(loadOperationToken(missingAIDToken)), { + message: /invalid activity token: missing activity ID \(aid\)/, + }); + + const missingRIDToken = base64URLEncodeNoPadding('{"t":2,"ns":"ns","aid":"a"}'); + t.throws(() => assertActivityOperationToken(loadOperationToken(missingRIDToken)), { + message: /invalid activity token: missing activity run ID \(rid\)/, + }); +}); + +test('loadWorkflowRunOperationToken rejects activity token', (t) => { + const activityToken = generateActivityOperationToken('ns', 'a', 'r'); + t.throws(() => loadWorkflowRunOperationToken(activityToken), { + message: /invalid workflow token type: 2, expected: 1/, + }); +}); diff --git a/packages/nexus/src/index.ts b/packages/nexus/src/index.ts index 96f720bae..e182458c9 100644 --- a/packages/nexus/src/index.ts +++ b/packages/nexus/src/index.ts @@ -17,11 +17,15 @@ export { export { startWorkflow, + type ActivityOptions as ActivityStartOptions, + type ActivityOptionsFor as ActivityStartOptionsFor, + type CancelActivityOptions, type CancelWorkflowRunOptions, type TemporalOperationHandlerOptions, TemporalOperationHandler, TemporalOperationResult, type TemporalNexusClient, + type NexusTypedActivityClient, type TemporalOperationStartHandler, WorkflowHandle, WorkflowRunOperationHandler, diff --git a/packages/nexus/src/token.ts b/packages/nexus/src/token.ts index 486fe36cb..c9b7ed6e5 100644 --- a/packages/nexus/src/token.ts +++ b/packages/nexus/src/token.ts @@ -22,9 +22,19 @@ export interface OperationToken { ns: string; /** - * ID of the workflow. + * ID of a workflow for OperationTokenType.WORKFLOW_RUN. */ wid?: string; + + /** + * ID of an activity for OperationTokenType.ACTIVITY. + */ + aid?: string; + + /** + * Run ID of an activity for OperationTokenType.ACTIVITY. + */ + rid?: string; } /** @@ -38,9 +48,20 @@ export interface WorkflowRunOperationToken extends OperationToken { wid: string; } +/** + * An OperationToken that identifies an Activity operation. + * + * @internal + * @hidden + */ +export interface ActivityOperationToken extends OperationToken { + t: typeof OperationTokenType.ACTIVITY; + aid: string; + rid: string; +} + /** * OperationTokenType is used to identify the type of Operation token. - * Currently, we only have one type of Operation token: WorkflowRun. * * @internal * @hidden @@ -53,6 +74,7 @@ export type OperationTokenType = (typeof OperationTokenType)[keyof typeof Operat */ export const OperationTokenType = { WORKFLOW_RUN: 1, + ACTIVITY: 2, } as const; /** @@ -64,6 +86,26 @@ export function generateWorkflowRunOperationToken(namespace: string, workflowId: ns: namespace, wid: workflowId, }; + return encodeOperationToken(token); +} + +/** + * Generate an activity Operation token. + */ +export function generateActivityOperationToken(namespace: string, activityId: string, runId: string): string { + const token: ActivityOperationToken = { + t: OperationTokenType.ACTIVITY, + ns: namespace, + aid: activityId, + rid: runId, + }; + return encodeOperationToken(token); +} + +/** + * Encode an OperationToken as a string. + */ +export function encodeOperationToken(token: OperationToken): string { return base64URLEncodeNoPadding(JSON.stringify(token)); } @@ -129,6 +171,21 @@ export function assertWorkflowRunOperationToken(token: OperationToken): asserts } } +/** + * Assert that an OperationToken identifies an activity. + */ +export function assertActivityOperationToken(token: OperationToken): asserts token is ActivityOperationToken { + if (token.t !== OperationTokenType.ACTIVITY) { + throw new TypeError(`invalid activity token type: ${token.t}, expected: ${OperationTokenType.ACTIVITY}`); + } + if (!token.aid || typeof token.aid !== 'string') { + throw new TypeError('invalid activity token: missing or invalid activity ID (aid)'); + } + if (!token.rid || typeof token.rid !== 'string') { + throw new TypeError('invalid activity token: missing or invalid activity run ID (rid)'); + } +} + function isOperationTokenType(value: number): value is OperationTokenType { return Object.values(OperationTokenType).includes(value as OperationTokenType); } diff --git a/packages/nexus/src/workflow-helpers.ts b/packages/nexus/src/workflow-helpers.ts index 967bd96c8..b224b5f50 100644 --- a/packages/nexus/src/workflow-helpers.ts +++ b/packages/nexus/src/workflow-helpers.ts @@ -1,17 +1,28 @@ import * as nexus from 'nexus-rpc'; import type { Workflow, WorkflowResultType } from '@temporalio/common'; import type { Replace } from '@temporalio/common/lib/type-helpers'; -import type { Client, WorkflowStartOptions as ClientWorkflowStartOptions } from '@temporalio/client'; +import type { + ActivityHandle, + ActivityName, + ActivityOptions as ClientActivityOptions, + ActivityOptionsFor as ClientActivityOptionsFor, + ActivityResult, + Client, + WorkflowStartOptions as ClientWorkflowStartOptions, +} from '@temporalio/client'; import { type temporal } from '@temporalio/proto'; -import type { InternalWorkflowStartOptions } from '@temporalio/client/lib/internal'; -import { InternalWorkflowStartOptionsSymbol } from '@temporalio/client/lib/internal'; +import type { InternalActivityStartOptions, InternalNexusStartOptions } from '@temporalio/client/lib/internal'; +import { InternalNexusStartOptionsSymbol } from '@temporalio/client/lib/internal'; import { convertNexusLinkToTemporalLink, convertTemporalLinkToNexusLink } from './link-converter'; import { + assertActivityOperationToken, assertWorkflowRunOperationToken, generateWorkflowRunOperationToken, + generateActivityOperationToken, loadOperationToken, loadWorkflowRunOperationToken, OperationTokenType, + encodeOperationToken, } from './token'; import { getClient, @@ -24,6 +35,59 @@ import { declare const isNexusWorkflowHandle: unique symbol; declare const workflowResultType: unique symbol; +async function startWithNexusOptions( + ctx: nexus.StartOperationContext, + token: string, + handler: (opts: InternalNexusStartOptions[typeof InternalNexusStartOptionsSymbol]) => Promise +) { + const links = Array(); + if (ctx.inboundLinks?.length > 0) { + for (const l of ctx.inboundLinks) { + try { + links.push(convertNexusLinkToTemporalLink(l)); + } catch (error) { + log.warn('failed to convert Nexus link to Workflow event link', { error }); + } + } + } + + const internalOptions: InternalNexusStartOptions[typeof InternalNexusStartOptionsSymbol] = { + links, + requestId: ctx.requestId, + onConflictOptions: { + attachLinks: true, + attachCompletionCallbacks: true, + attachRequestId: true, + }, + }; + + // Add nexus-operation-token header to solve for race between Workflow completion + // and Nexus Operation start recording + const callbackHeaders = { + ...ctx.callbackHeaders, + 'nexus-operation-token': token, + }; + if (ctx.callbackUrl) { + internalOptions.completionCallbacks = [ + { + nexus: { url: ctx.callbackUrl, header: callbackHeaders }, + links, // pass in links here as well for older servers, newer servers dedupe them. + }, + ]; + } + + const result = await handler(internalOptions); + if (internalOptions.backLink != null) { + try { + ctx.outboundLinks.push(convertTemporalLinkToNexusLink(internalOptions.backLink)); + } catch (error) { + log.warn('failed to convert temporal link to Nexus link', { error }); + } + } + + return result; +} + /** * A handle to a running workflow that is returned by the {@link startWorkflow} helper. * This handle should be returned by {@link WorkflowRunOperationStartHandler} implementations. @@ -84,64 +148,23 @@ export async function startWorkflow( workflowTypeOrFunc: string | T, workflowOptions: WorkflowStartOptions ): Promise>> { - const { client, taskQueue } = getHandlerContext(); - const links = Array(); - if (ctx.inboundLinks?.length > 0) { - for (const l of ctx.inboundLinks) { - try { - links.push(convertNexusLinkToTemporalLink(l)); - } catch (error) { - log.warn('failed to convert Nexus link to Workflow event link', { error }); - } - } - } - const internalOptions: InternalWorkflowStartOptions[typeof InternalWorkflowStartOptionsSymbol] = { - links, - requestId: ctx.requestId, - }; - - internalOptions.onConflictOptions = { - attachLinks: true, - attachCompletionCallbacks: true, - attachRequestId: true, - }; - - // Add nexus-operation-token header to solve for race between Workflow completion - // and Nexus Operation start recording - const callbackHeaders = { - ...ctx.callbackHeaders, - 'nexus-operation-token': generateWorkflowRunOperationToken(client.options.namespace, workflowOptions.workflowId), - }; - - if (ctx.callbackUrl) { - internalOptions.completionCallbacks = [ - { - nexus: { url: ctx.callbackUrl, header: callbackHeaders }, - links, // pass in links here as well for older servers, newer servers dedupe them. - }, - ]; - } + const { client, taskQueue, namespace } = getHandlerContext(); + const token = generateWorkflowRunOperationToken(namespace, workflowOptions.workflowId); + return await startWithNexusOptions(ctx, token, async (internalOptions) => { + const { taskQueue: userSpecifiedTaskQueue, ...rest } = workflowOptions; + const startOptions: ClientWorkflowStartOptions = { + ...rest, + taskQueue: userSpecifiedTaskQueue || taskQueue, + [InternalNexusStartOptionsSymbol]: internalOptions, + }; - const { taskQueue: userSpecifiedTaskQueue, ...rest } = workflowOptions; - const startOptions: ClientWorkflowStartOptions = { - ...rest, - taskQueue: userSpecifiedTaskQueue || taskQueue, - [InternalWorkflowStartOptionsSymbol]: internalOptions, - }; + const handle = await client.workflow.start(workflowTypeOrFunc, startOptions); - const handle = await client.workflow.start(workflowTypeOrFunc, startOptions); - if (internalOptions.backLink != null) { - try { - ctx.outboundLinks.push(convertTemporalLinkToNexusLink(internalOptions.backLink)); - } catch (error) { - log.warn('failed to convert temporal link to Nexus link', { error }); - } - } - - return { - workflowId: handle.workflowId, - runId: handle.firstExecutionRunId, - } as WorkflowHandle>; + return { + workflowId: handle.workflowId, + runId: handle.firstExecutionRunId, + } as WorkflowHandle>; + }); } /** @@ -174,6 +197,75 @@ export class WorkflowRunOperationHandler implements nexus.OperationHandler } } +/** + * Options for starting an untyped activity using {@link TemporalNexusClient.startActivity}, this type is identical to the + * client's `ActivityOptions` with the exception that `taskQueue` is optional and defaults + * to the current worker's task queue. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +export type ActivityOptions = Replace; + +/** + * Options for starting a typed activity using {@link NexusTypedActivityClient.startActivity}, this type is identical to the + * client's `ActivityOptionsFor` with the exception that `taskQueue` is optional and defaults + * to the current worker's task queue. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +export type ActivityOptionsFor> = Replace< + ClientActivityOptionsFor, + { taskQueue?: string } +>; + +/** + * Starts an activity for a {@link TemporalNexusClient.startActivity}, linking the execution chain + * to a Nexus Operation. Automatically propagates the callback, request ID, and back and forward + * links from the Nexus options to the Activity. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +async function startActivity( + ctx: TemporalStartOperationContext, + activity: string, + activityOptions: ActivityOptions +): Promise> { + const { client, taskQueue, namespace } = getHandlerContext(); + + // Activity tokens included in nexus callback headers cannot have + // run ID as it's unknown until the activity has been started so we + // manually construct the token. + const token = encodeOperationToken({ + t: OperationTokenType.ACTIVITY, + ns: namespace, + aid: activityOptions.id, + }); + + return await startWithNexusOptions(ctx, token, async (internalOptions) => { + const { taskQueue: userSpecifiedTaskQueue, ...rest } = activityOptions; + const startOptions: InternalActivityStartOptions = { + ...rest, + taskQueue: userSpecifiedTaskQueue || taskQueue, + [InternalNexusStartOptionsSymbol]: internalOptions, + }; + + try { + return await client.activity.start(activity, startOptions); + } catch (err) { + // ActivityClient.validateActivityOptions throws TypeError if there are bad options specified. + // e.g. missing startToCloseTimeout or scheduleToCloseTimeout + // If uncaught, causes Nexus operations to retry until timeout or circuit breaker trips and + // reason is not easily discoverable. + if (err instanceof TypeError) { + throw new nexus.HandlerError(nexus.HandlerErrorType.BAD_REQUEST, `Failed to start activity: ${err.message}`, { + cause: err, + }); + } + throw err; + } + }); +} + /** * Module-private brand and payload key for {@link TemporalOperationResult}. */ @@ -225,6 +317,34 @@ export interface TemporalNexusClient { workflowTypeOrFunc: string | T, workflowOptions: WorkflowStartOptions ): Promise>>; + + startActivity(activity: string, options: ActivityOptions): Promise>; + + /** + * Returns this client as a {@link NexusTypedActivityClient}. It enables strong type checking of Activity name, arguments + * and result based on the provided Activity interface. Note that no new client object is created - this method only + * affects type annotations. + * @template T Activity interface to use for type checking. The returned client can only start activities present in + * this interface. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ + typedActivity(): NexusTypedActivityClient; +} + +/** + * Sub-interface of {@link TemporalNexusClient} that provides a strongly-typed interface for executing Activities. + * Argument types in the provided options must match the argument types of the specified Activity as defined in provided + * interface + * @template T Activity interface + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +export interface NexusTypedActivityClient { + startActivity>( + activity: N, + options: ActivityOptionsFor + ): Promise>>; } class TemporalNexusClientImpl implements TemporalNexusClient { @@ -257,6 +377,25 @@ class TemporalNexusClientImpl implements TemporalNexusClient { }); } + public async startActivity( + activity: string, + options: Replace + ): Promise> { + return await this.withAsyncOperationStartReservation(async () => { + const handle = await startActivity(this.startOperationContext, activity, options); + const { namespace } = getHandlerContext(); + + // handle.runId is always populated when starting an activity + // it's safe to use non-null assertion + const token = generateActivityOperationToken(namespace, handle.activityId, handle.runId!); + return TemporalOperationResult.async(token); + }); + } + + public typedActivity(): NexusTypedActivityClient { + return this; + } + private async withAsyncOperationStartReservation(fn: () => Promise): Promise { if (this.asyncOperationStarted) { throw new nexus.HandlerError( @@ -299,6 +438,24 @@ export interface CancelWorkflowRunOptions { readonly workflowId: string; } +/** + * Options passed to a {@link TemporalOperationHandlerOptions.cancelActivity} handler describing + * the activity to cancel. + * + * @experimental Nexus support in Temporal SDK is experimental. + */ +export interface CancelActivityOptions { + /** + * The ID of the activity backing the Nexus Operation that is being canceled. + */ + readonly activityId: string; + + /** + * The run ID of the activity backing the Nexus Operation that is being canceled. + */ + readonly runId: string; +} + /** * Options for customizing a {@link TemporalOperationHandler}. * @@ -306,6 +463,7 @@ export interface CancelWorkflowRunOptions { */ export interface TemporalOperationHandlerOptions { cancelWorkflowRun?: (ctx: TemporalCancelOperationContext, options: CancelWorkflowRunOptions) => Promise; + cancelActivity?: (ctx: TemporalCancelOperationContext, options: CancelActivityOptions) => Promise; } /** @@ -316,10 +474,12 @@ export interface TemporalOperationHandlerOptions { export class TemporalOperationHandler implements nexus.OperationHandler { private readonly startHandler: TemporalOperationStartHandler; private readonly cancelWorkflowRunHandler: NonNullable; + private readonly cancelActivityHandler: NonNullable; constructor(options: { start: TemporalOperationStartHandler } & TemporalOperationHandlerOptions) { this.startHandler = options.start; this.cancelWorkflowRunHandler = options.cancelWorkflowRun ?? defaultCancelWorkflowRun; + this.cancelActivityHandler = options.cancelActivity ?? defaultCancelActivity; } async start(ctx: nexus.StartOperationContext, input: I): Promise> { @@ -346,6 +506,16 @@ export class TemporalOperationHandler implements nexus.OperationHandler implements nexus.OperationHandler { }); }); -test('WorkflorRunOperation workflow run has Nexus-Operation-Token Header', async (t) => { +test('WorkflowRunOperation workflow run has Nexus-Operation-Token Header', async (t) => { const { createWorker, registerNexusEndpoint } = helpers(t); const { endpointName } = await registerNexusEndpoint(); const { handler, workflowStarted } = makeTestHandler(); diff --git a/packages/test/src/test-nexus-temporal-operation.ts b/packages/test/src/test-nexus-temporal-operation.ts index 52cf8dcfe..f5220c6c5 100644 --- a/packages/test/src/test-nexus-temporal-operation.ts +++ b/packages/test/src/test-nexus-temporal-operation.ts @@ -1,19 +1,27 @@ import assert from 'assert'; import { randomUUID } from 'crypto'; import * as nexus from 'nexus-rpc'; -import { NexusOperationFailure } from '@temporalio/common'; +import { ApplicationFailure, NexusOperationFailure } from '@temporalio/common'; import { NexusOperationExecutionStatus, + NexusOperationFailureError, WorkflowExecutionAlreadyStartedError, WorkflowFailedError, } from '@temporalio/client'; import * as temporalnexus from '@temporalio/nexus'; import { asyncLocalStorage } from '@temporalio/nexus/lib/context'; -import { base64URLEncodeNoPadding, generateWorkflowRunOperationToken } from '@temporalio/nexus/lib/token'; +import { + base64URLEncodeNoPadding, + encodeOperationToken, + generateWorkflowRunOperationToken, + OperationTokenType, +} from '@temporalio/nexus/lib/token'; import * as workflow from '@temporalio/workflow'; +import { Context } from '@temporalio/activity'; import { helpers, makeTestFunction } from './helpers-integration'; import { innermostHandlerError } from './helpers-nexus'; import { waitUntil } from './helpers'; +import { echo, throwAnError } from './activities'; const test = makeTestFunction({ workflowsPath: __filename, @@ -39,6 +47,9 @@ const temporalOpService = nexus.service('temporalOperationService', { syncOp: nexus.operation(), doubleStartOp: nexus.operation(), retryAfterFailedStartOp: nexus.operation(), + echoActivity: nexus.operation(), + failingActivity: nexus.operation(), + blockingActivity: nexus.operation(), }); const temporalCancelOpService = nexus.service('temporalCancelOperationService', { @@ -62,6 +73,9 @@ function makeTemporalOpServiceHandler(overrides: Partial { await workflow.condition(() => false); } +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Activities + +const activities = { + echo, + throwAnError, + async waitForCancellation() { + const cx = Context.current(); + while (true) { + await cx.sleep(300); + await cx.heartbeat(); + } + }, +}; + //////////////////////////////////////////////////////////////////////////////////////////////////// // Tests @@ -157,6 +186,58 @@ test('TemporalOperationHandler infers correct output type from typed workflow fu t.pass(); }); +test('TemporalOperationHandler infers correct output type from typed activity', async (t) => { + // echo(message?: string): Promise, so typedActivity().startActivity('echo', ...) + // resolves to TemporalOperationResult and the operation output type infers as string. + const _activityStringOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input: string) { + return await client.typedActivity().startActivity('echo', { + id: 'test', + args: [input], + scheduleToCloseTimeout: '10s', + }); + }, + }); + + // @ts-expect-error - Output type should be string (echo returns string), not number + const _activityMismatchedOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input: string) { + return await client.typedActivity().startActivity('echo', { + id: 'test', + args: [input], + scheduleToCloseTimeout: '10s', + }); + }, + }); + + const _activityWrongArgsOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, _input: string) { + return await client.typedActivity().startActivity('echo', { + id: 'test', + // @ts-expect-error - echo expects a string argument, not a number + args: [42], + scheduleToCloseTimeout: '10s', + }); + }, + }); + + const _explicitActivityStringOp: nexus.OperationHandler = new temporalnexus.TemporalOperationHandler< + string, + string + >({ + async start(_ctx, client, input) { + return await client.typedActivity().startActivity('echo', { + id: 'test', + args: [input], + scheduleToCloseTimeout: '10s', + }); + }, + }); + + // This test only checks for compile-time errors. + t.pass(); +}); + test('TemporalOperationHandler cancel delegates to provided cancelWorkflowRun handler', async (t) => { const { createWorker, registerNexusEndpoint } = helpers(t); const { endpointName } = await registerNexusEndpoint(); @@ -221,7 +302,7 @@ test('TemporalOperationHandler.cancel rejects invalid operation token type befor throw new Error('cancelWorkflowRun should not be called'); }, }); - const token = base64URLEncodeNoPadding(JSON.stringify({ t: 2, ns: 'test-namespace' })); + const token = base64URLEncodeNoPadding(JSON.stringify({ t: 99, ns: 'test-namespace' })); const err = await asyncLocalStorage.run( { @@ -250,6 +331,50 @@ test('TemporalOperationHandler.cancel rejects invalid operation token type befor t.regex(err?.message ?? '', /invalid operation token/); }); +test('TemporalOperationHandler.cancel rejects malformed activity token before invoking cancelActivity', async (t) => { + let cancelActivityCalled = false; + const handler = new temporalnexus.TemporalOperationHandler({ + async start(_ctx, _client, _input) { + return temporalnexus.TemporalOperationResult.sync(undefined); + }, + + async cancelActivity(_ctx, _options) { + cancelActivityCalled = true; + throw new Error('cancelActivity should not be called'); + }, + }); + const token = base64URLEncodeNoPadding(JSON.stringify({ t: OperationTokenType.ACTIVITY, ns: 'test-namespace' })); + + const err = await asyncLocalStorage.run( + { + client: undefined as any, + endpoint: 'test-endpoint', + namespace: 'test-namespace', + taskQueue: 'test-task-queue', + log: undefined as any, + metrics: undefined as any, + }, + async () => { + return await t.throwsAsync( + handler.cancel( + { + abortSignal: new AbortController().signal, + headers: {}, + operation: 'operation', + service: 'service', + }, + token + ) + ); + } + ); + + assert(err instanceof nexus.HandlerError); + t.is(err.type, 'BAD_REQUEST'); + t.regex(err.message, /invalid activity operation token/); + t.false(cancelActivityCalled, 'cancelActivity must not be invoked for a malformed activity token'); +}); + test('TemporalOperationHandler async and sync happy paths - caller workflow', async (t) => { const { createWorker, executeWorkflow, registerNexusEndpoint } = helpers(t); const { endpointName } = await registerNexusEndpoint(); @@ -287,47 +412,74 @@ test('TemporalOperationHandler async and sync happy paths - caller workflow', as }); }); -test('TemporalOperationHandler rejects multiple async starts', async (t) => { - const { createWorker, executeWorkflow, registerNexusEndpoint } = helpers(t); - const { endpointName } = await registerNexusEndpoint(); +// A single backing-operation start, as invoked from inside a start handler. The result is +// awaited then discarded because the parameterized guard test always throws afterwards. +type StartAction = (client: temporalnexus.TemporalNexusClient, input: string) => Promise; - const worker = await createWorker({ - nexusServices: [ - makeTemporalOpServiceHandler({ - doubleStartOp: new temporalnexus.TemporalOperationHandler({ - async start(_ctx, client, input) { - await client.startWorkflow(echoWorkflow, { - workflowId: randomUUID(), - args: [input], - }); - await client.startWorkflow(echoWorkflow, { - workflowId: randomUUID(), - args: [input], - }); - throw new nexus.HandlerError('INTERNAL', 'expected previous error to be thrown'); - }, - }), - }), - ], +const startWorkflowAction: StartAction = (client, input) => + client.startWorkflow(echoWorkflow, { + workflowId: randomUUID(), + args: [input], }); - await worker.runUntil(async () => { - const err = await t.throwsAsync( - () => - executeWorkflow(temporalDoubleStartOpCaller, { - args: [endpointName], +const startActivityAction: StartAction = (client, input) => + client.startActivity('echo', { + id: randomUUID(), + args: [input], + scheduleToCloseTimeout: '10s', + }); + +// The shared multiple-async-start guard (withAsyncOperationStartReservation) must reject a +// second backing-operation start regardless of which start kinds are combined. Each case's +// `name` becomes part of the test title and the derived Nexus endpoint name, so it must use +// only characters the endpoint-name transform sanitizes (letters/digits/spaces/parens/hyphens); +// avoid '+', '&', etc., which leak through and fail endpoint registration. Add a row to cover a +// new combination, or a new StartAction const to cover a new start kind. +const multipleAsyncStartCases: { name: string; first: StartAction; second: StartAction }[] = [ + { name: 'workflow then workflow', first: startWorkflowAction, second: startWorkflowAction }, + { name: 'activity then activity', first: startActivityAction, second: startActivityAction }, + { name: 'workflow then activity', first: startWorkflowAction, second: startActivityAction }, + { name: 'activity then workflow', first: startActivityAction, second: startWorkflowAction }, +]; + +for (const { name, first, second } of multipleAsyncStartCases) { + test(`TemporalOperationHandler rejects multiple async starts (${name})`, async (t) => { + const { createWorker, executeWorkflow, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + doubleStartOp: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + await first(client, input); + await second(client, input); // guard trips here + throw new nexus.HandlerError('INTERNAL', 'expected previous error to be thrown'); + }, + }), }), - { - instanceOf: WorkflowFailedError, - } - ); - assert(err?.cause instanceof NexusOperationFailure); - assert(err.cause.cause instanceof nexus.HandlerError); - const inner = innermostHandlerError(err.cause.cause); - t.is(inner.type, 'BAD_REQUEST'); - t.regex(inner.message, /Only one async operation can be started per operation handler invocation/); + ], + }); + + await worker.runUntil(async () => { + const err = await t.throwsAsync( + () => + executeWorkflow(temporalDoubleStartOpCaller, { + args: [endpointName], + }), + { + instanceOf: WorkflowFailedError, + } + ); + assert(err?.cause instanceof NexusOperationFailure); + assert(err.cause.cause instanceof nexus.HandlerError); + const inner = innermostHandlerError(err.cause.cause); + t.is(inner.type, 'BAD_REQUEST'); + t.regex(inner.message, /Only one async operation can be started per operation handler invocation/); + }); }); -}); +} test('TemporalOperationHandler allows retry after failed async start', async (t) => { const { createWorker, executeWorkflow, startWorkflow, registerNexusEndpoint } = helpers(t); @@ -453,3 +605,289 @@ test('TemporalOperationHandler workflow run has Nexus-Operation-Token Header', a t.is(opToken, generateWorkflowRunOperationToken(client.options.namespace, targetHandle.workflowId)); }); }); + +test('TemporalOperationHandler activity has Nexus-Operation-Token Header', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { client } = t.context.env; + const { endpointName } = await registerNexusEndpoint(); + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + asyncOp: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + return await client.typedActivity().startActivity('echo', { + id: input, + args: [input], + scheduleToCloseTimeout: '10s', + }); + }, + }), + }), + ], + }); + + await worker.runUntil(async () => { + const targetActivityId = randomUUID(); + const nexusClient = client.nexus.createServiceClient({ endpoint: endpointName, service: temporalOpService }); + + const result = await nexusClient.executeOperation(temporalOpService.operations.asyncOp, targetActivityId, { + id: randomUUID(), + scheduleToCloseTimeout: '10s', + }); + t.is(result, targetActivityId); + + const targetHandle = client.activity.getHandle(targetActivityId); + const desc = await targetHandle.describe(); + + const expectedToken = encodeOperationToken({ + t: OperationTokenType.ACTIVITY, + ns: client.options.namespace, + aid: targetActivityId, + }); + const actualToken = desc.rawCallbacks?.[0].info?.callback?.nexus?.header?.['nexus-operation-token']; + t.is(actualToken, expectedToken); + }); +}); + +test('TemporalOperationHandler start typed standalone activity', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { client } = t.context.env; + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + echoActivity: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + return await client.typedActivity().startActivity('echo', { + id: randomUUID(), + args: [input], + scheduleToCloseTimeout: '10s', + }); + }, + }), + }), + ], + }); + + await worker.runUntil(async () => { + const nexusSvc = client.nexus.createServiceClient({ endpoint: endpointName, service: temporalOpService }); + const result = await nexusSvc.executeOperation(temporalOpService.operations.echoActivity, 'foo', { + id: randomUUID(), + }); + t.is(result, 'foo'); + }); +}); + +test('TemporalOperationHandler start untyped standalone activity', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { client } = t.context.env; + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + echoActivity: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + return await client.startActivity('echo', { + id: randomUUID(), + args: [input], + scheduleToCloseTimeout: '10s', + }); + }, + }), + }), + ], + }); + + await worker.runUntil(async () => { + const nexusSvc = client.nexus.createServiceClient({ endpoint: endpointName, service: temporalOpService }); + const result = await nexusSvc.executeOperation(temporalOpService.operations.echoActivity, 'foo', { + id: randomUUID(), + }); + t.is(result, 'foo'); + }); +}); + +test('TemporalOperationHandler converts invalid activity options to BAD_REQUEST', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { client } = t.context.env; + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + echoActivity: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + // Intentionally omit scheduleToCloseTimeout/startToCloseTimeout. Activity option + // validation throws a TypeError, which startActivity converts into a BAD_REQUEST + // HandlerError so the caller gets an actionable, non-retryable error instead of the + // operation silently retrying until it times out. + return await client.startActivity('echo', { + id: randomUUID(), + args: [input], + }); + }, + }), + }), + ], + }); + + await worker.runUntil(async () => { + const nexusSvc = client.nexus.createServiceClient({ endpoint: endpointName, service: temporalOpService }); + const err = await t.throwsAsync( + nexusSvc.executeOperation(temporalOpService.operations.echoActivity, 'foo', { + id: randomUUID(), + }), + { instanceOf: NexusOperationFailureError } + ); + assert(err?.cause instanceof nexus.HandlerError); + const inner = innermostHandlerError(err.cause); + t.is(inner.type, 'BAD_REQUEST'); + t.regex( + inner.message, + /Failed to start activity: Either scheduleToCloseTimeout or startToCloseTimeout is required/ + ); + }); +}); + +test('TemporalOperationHandler propagates backing activity failure', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { client } = t.context.env; + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + failingActivity: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, message) { + // throwAnError(true, message) throws a non-retryable ApplicationFailure, so the + // backing activity fails permanently and the failure propagates to the Nexus caller. + return await client.typedActivity().startActivity('throwAnError', { + id: randomUUID(), + args: [true, message], + scheduleToCloseTimeout: '10s', + }); + }, + }), + }), + ], + }); + + await worker.runUntil(async () => { + const nexusSvc = client.nexus.createServiceClient({ endpoint: endpointName, service: temporalOpService }); + const err = await t.throwsAsync( + nexusSvc.executeOperation(temporalOpService.operations.failingActivity, 'activity failed', { + id: randomUUID(), + }), + { instanceOf: NexusOperationFailureError } + ); + + assert(err?.cause instanceof ApplicationFailure); + const activityFailure = err.cause.cause; + assert(activityFailure instanceof ApplicationFailure); + t.is(activityFailure.message, 'activity failed'); + t.is(activityFailure.type, 'Error'); + t.true(activityFailure.nonRetryable); + }); +}); + +test('TemporalOperationHandler cancels backing activity', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { client } = t.context.env; + + let startedActivity = false; + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + blockingActivity: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + const result = await client.typedActivity().startActivity('waitForCancellation', { + id: input, + scheduleToCloseTimeout: '10s', + }); + startedActivity = true; + return result; + }, + }), + }), + ], + }); + + await worker.runUntil(async () => { + const nexusSvc = client.nexus.createServiceClient({ endpoint: endpointName, service: temporalOpService }); + const targetActivityId = `wait-for-cancel-${randomUUID()}`; + const handle = await nexusSvc.startOperation(temporalOpService.operations.blockingActivity, targetActivityId, { + id: randomUUID(), + }); + await waitUntil(async () => startedActivity, 4000); + + const activityHandle = client.activity.getHandle(targetActivityId); + + await handle.cancel(); + + await waitUntil(async () => (await handle.describe()).status === 'CANCELED', 4000); + await waitUntil(async () => (await activityHandle.describe()).status === 'CANCELED', 4000); + + // Assertions built into the waitUntils + t.pass(); + }); +}); + +test('TemporalOperationHandler invokes custom cancelActivity', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { client } = t.context.env; + + let activityStarted = false; + let customCancelCalled = false; + + const worker = await createWorker({ + activities, + nexusServices: [ + makeTemporalOpServiceHandler({ + blockingActivity: new temporalnexus.TemporalOperationHandler({ + async start(_ctx, client, input) { + const result = await client.typedActivity().startActivity('waitForCancellation', { + id: input, + scheduleToCloseTimeout: '10s', + }); + activityStarted = true; + return result; + }, + async cancelActivity(ctx, { activityId, runId }) { + const handle = temporalnexus.getClient().activity.getHandle(activityId, runId); + await handle.cancel('test custom cancellation'); + customCancelCalled = true; + }, + }), + }), + ], + }); + + await worker.runUntil(async () => { + const nexusSvc = client.nexus.createServiceClient({ endpoint: endpointName, service: temporalOpService }); + const targetActivityId = `wait-for-cancel-${randomUUID()}`; + const result = await nexusSvc.startOperation(temporalOpService.operations.blockingActivity, targetActivityId, { + id: randomUUID(), + }); + + await waitUntil(async () => activityStarted, 4000); + + const activityHandle = client.activity.getHandle(targetActivityId); + await result.cancel(); + + await waitUntil(async () => (await result.describe()).status === 'CANCELED', 4000); + await waitUntil(async () => (await activityHandle.describe()).status === 'CANCELED', 4000); + t.true(customCancelCalled, 'expected custom cancel to be called'); + }); +});