From 74b481558b8dd8ecbd947ff857b697f1127026cf Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Wed, 22 Apr 2026 13:55:14 -0700 Subject: [PATCH 1/4] feat(client): add standalone Nexus operations client Introduce NexusClient sub-client on Client for starting and managing standalone Nexus operations and interceptor support via ClientInterceptors.nexus. --- packages/client/package.json | 1 + packages/client/src/client.ts | 15 + packages/client/src/helpers.ts | 21 + packages/client/src/index.ts | 2 + packages/client/src/interceptors.ts | 100 ++- packages/client/src/nexus-client.ts | 723 ++++++++++++++++++ packages/client/src/nexus-types.ts | 459 +++++++++++ packages/core-bridge/sdk-core | 2 +- packages/core-bridge/src/metrics.rs | 5 +- .../test/src/test-nexus-standalone-types.ts | 206 +++++ packages/test/src/test-nexus-standalone.ts | 459 +++++++++++ 11 files changed, 1989 insertions(+), 4 deletions(-) create mode 100644 packages/client/src/nexus-client.ts create mode 100644 packages/client/src/nexus-types.ts create mode 100644 packages/test/src/test-nexus-standalone-types.ts create mode 100644 packages/test/src/test-nexus-standalone.ts diff --git a/packages/client/package.json b/packages/client/package.json index e009f52fb..e2e6f5ae1 100644 --- a/packages/client/package.json +++ b/packages/client/package.json @@ -18,6 +18,7 @@ "@temporalio/proto": "workspace:*", "abort-controller": "^3.0.0", "long": "^5.2.3", + "nexus-rpc": "^0.0.2", "uuid": "^11.1.0" }, "devDependencies": { diff --git a/packages/client/src/client.ts b/packages/client/src/client.ts index bcad9d271..c270d41c2 100644 --- a/packages/client/src/client.ts +++ b/packages/client/src/client.ts @@ -3,6 +3,7 @@ import { AsyncCompletionClient } from './async-completion-client'; import type { BaseClientOptions, LoadedWithDefaults } from './base-client'; import { BaseClient, defaultBaseClientOptions } from './base-client'; import type { ClientInterceptors } from './interceptors'; +import { NexusClient } from './nexus-client'; import { ScheduleClient } from './schedule-client'; import type { QueryRejectCondition, WorkflowService } from './types'; import { WorkflowClient } from './workflow-client'; @@ -62,6 +63,12 @@ export class Client extends BaseClient { * @experimental The Worker Versioning API is still being designed. Major changes are expected. */ public readonly taskQueue: TaskQueueClient; + /** + * Nexus sub-client - use to start and interact with standalone Nexus operations. + * + * @experimental Standalone Nexus operations are a new API and susceptible to change. + */ + public readonly nexus: NexusClient; constructor(options?: ClientOptions) { options = options ?? {}; @@ -108,6 +115,13 @@ export class Client extends BaseClient { dataConverter: this.dataConverter, }); + this.nexus = new NexusClient({ + ...commonOptions, + connection: this.connection, + dataConverter: this.dataConverter, + interceptors: interceptors?.nexus, + }); + this.options = { ...defaultBaseClientOptions(), ...filterNullAndUndefined(commonOptions), @@ -115,6 +129,7 @@ export class Client extends BaseClient { interceptors: { workflow: this.workflow.options.interceptors, schedule: this.schedule.options.interceptors, + nexus: this.nexus.options.interceptors, }, workflow: { queryRejectCondition: this.workflow.options.queryRejectCondition, diff --git a/packages/client/src/helpers.ts b/packages/client/src/helpers.ts index e34306a7f..c28a7d659 100644 --- a/packages/client/src/helpers.ts +++ b/packages/client/src/helpers.ts @@ -163,3 +163,24 @@ function getGrpcStatusDetails(err: GrpcServiceError): google.rpc.Status['details } return google.rpc.Status.decode(statusBuffer).details; } + +/** + * Try to extract the runId field from a gRPC ALREADY_EXISTS error's details containing + * NexusOperationExecutionAlreadyStartedFailure. Returns undefined if the error details + * do not include this failure type or cannot be decoded. + */ +export function extractNexusOperationAlreadyStartedRunId(err: GrpcServiceError): string | undefined { + try { + for (const entry of getGrpcStatusDetails(err) ?? []) { + if (!entry.type_url || !entry.value) continue; + const type = entry.type_url.replace(/^type.googleapis.com\//, '') as ErrorDetailsName; + if (type !== 'temporal.api.errordetails.v1.NexusOperationExecutionAlreadyStartedFailure') continue; + + const details = temporal.api.errordetails.v1.NexusOperationExecutionAlreadyStartedFailure.decode(entry.value); + return details.runId; + } + } catch { + // ignore + } + return undefined; +} diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index 9db7133cc..be68b1212 100644 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -71,6 +71,8 @@ export * from './workflow-options'; export * from './schedule-types'; export * from './schedule-client'; export * from './task-queue-client'; +export * from './nexus-types'; +export * from './nexus-client'; export { WorkflowUpdateStage } from './workflow-update-stage'; export { WorkerBuildIdVersionSets, diff --git a/packages/client/src/interceptors.ts b/packages/client/src/interceptors.ts index 35ad98913..9bd7449e2 100644 --- a/packages/client/src/interceptors.ts +++ b/packages/client/src/interceptors.ts @@ -4,8 +4,17 @@ * @module */ +import type { Duration, SearchAttributePair, TypedSearchAttributes } from '@temporalio/common'; import { Headers, Next } from '@temporalio/common'; import type { temporal } from '@temporalio/proto'; +import type { NexusOperationHandle } from './nexus-client'; +import type { + NexusOperationExecutionCount, + NexusOperationExecutionDescription, + NexusOperationExecution, + NexusOperationIdConflictPolicy, + NexusOperationIdReusePolicy, +} from './nexus-types'; import type { CompiledScheduleOptions } from './schedule-types'; import type { DescribeWorkflowExecutionResponse, @@ -237,14 +246,101 @@ export type CreateScheduleOutput = { readonly conflictToken: Uint8Array; }; +/** + * Implement any of these methods to intercept NexusClient outbound calls. + */ +export interface NexusClientInterceptor { + /** Intercept a call to {@link NexusServiceClient.startOperation}. */ + startOperation?: ( + input: StartNexusOperationInput, + next: Next + ) => Promise>; + + /** Intercept {@link NexusOperationHandle.result}. */ + getResult?: (input: GetNexusOperationResultInput, next: Next) => Promise; + + /** Intercept {@link NexusOperationHandle.describe}. */ + describe?: ( + input: DescribeNexusOperationInput, + next: Next + ) => Promise; + + /** Intercept {@link NexusOperationHandle.cancel}. */ + cancel?: (input: CancelNexusOperationInput, next: Next) => Promise; + + /** Intercept {@link NexusOperationHandle.terminate}. */ + terminate?: (input: TerminateNexusOperationInput, next: Next) => Promise; + + /** Intercept {@link NexusClient.list}. */ + list?: (input: ListNexusOperationsInput, next: Next) => AsyncIterable; + + /** Intercept {@link NexusClient.count}. */ + count?: (input: CountNexusOperationsInput, next: Next) => Promise; +} + +/** Input for {@link NexusClientInterceptor.startOperation}. */ +export interface StartNexusOperationInput { + readonly endpoint: string; + readonly service: string; + readonly operation: string; + readonly id: string; + readonly arg: unknown; + readonly scheduleToCloseTimeout?: Duration; + readonly summary?: string; + readonly idReusePolicy?: NexusOperationIdReusePolicy; + readonly idConflictPolicy?: NexusOperationIdConflictPolicy; + readonly searchAttributes?: SearchAttributePair[] | TypedSearchAttributes; + readonly headers?: Record; +} + +/** Input for {@link NexusClientInterceptor.getResult}. */ +export interface GetNexusOperationResultInput { + readonly operationId: string; + readonly runId?: string; + /** Type hint for the deserialized result (used by interceptors/codecs). */ + readonly resultType?: unknown; +} + +/** Input for {@link NexusClientInterceptor.describe}. */ +export interface DescribeNexusOperationInput { + readonly operationId: string; + readonly runId?: string; + readonly longPollToken?: Uint8Array; +} + +/** Input for {@link NexusClientInterceptor.cancel}. */ +export interface CancelNexusOperationInput { + readonly operationId: string; + readonly runId?: string; + readonly reason?: string; +} + +/** Input for {@link NexusClientInterceptor.terminate}. */ +export interface TerminateNexusOperationInput { + readonly operationId: string; + readonly runId?: string; + readonly reason?: string; +} + +/** Input for {@link NexusClientInterceptor.list}. */ +export interface ListNexusOperationsInput { + readonly query?: string; + readonly pageSize?: number; +} + +/** Input for {@link NexusClientInterceptor.count}. */ +export interface CountNexusOperationsInput { + readonly query?: string; +} + /** * Interceptors for any high-level SDK client. - * - * NOTE: Currently only for {@link WorkflowClient} and {@link ScheduleClient}. More will be added later as needed. */ export interface ClientInterceptors { // eslint-disable-next-line @typescript-eslint/no-deprecated workflow?: WorkflowClientInterceptors | WorkflowClientInterceptor[]; schedule?: ScheduleClientInterceptor[]; + + nexus?: NexusClientInterceptor[]; } diff --git a/packages/client/src/nexus-client.ts b/packages/client/src/nexus-client.ts new file mode 100644 index 000000000..f447c16cf --- /dev/null +++ b/packages/client/src/nexus-client.ts @@ -0,0 +1,723 @@ +import { status as grpcStatus } from '@grpc/grpc-js'; +import type * as nexus from 'nexus-rpc'; +import { v4 as uuid4 } from 'uuid'; +import { composeInterceptors } from '@temporalio/common/lib/interceptors'; +import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; +import { + decodeTypedSearchAttributes, + encodeUnifiedSearchAttributes, + searchAttributePayloadConverter, +} from '@temporalio/common/lib/converter/payload-search-attributes'; +import { + decodeFromPayloadsAtIndex, + decodeOptionalFailureToOptionalError, + decodeOptionalSinglePayload, + encodeToPayload, +} from '@temporalio/common/lib/internal-non-workflow'; +import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow'; +import { msOptionalToTs, optionalTsToDate, optionalTsToMs } from '@temporalio/common/lib/time'; +import { temporal } from '@temporalio/proto'; +import type { LoadedDataConverter } from '@temporalio/common'; +import type { BaseClientOptions, LoadedWithDefaults, WithDefaults } from './base-client'; +import { BaseClient, defaultBaseClientOptions } from './base-client'; +import { isGrpcServiceError, ServiceError } from './errors'; +import { rethrowKnownErrorTypes, extractNexusOperationAlreadyStartedRunId } from './helpers'; +import type { + CancelNexusOperationInput, + CountNexusOperationsInput, + DescribeNexusOperationInput, + GetNexusOperationResultInput, + ListNexusOperationsInput, + NexusClientInterceptor, + StartNexusOperationInput, + TerminateNexusOperationInput, +} from './interceptors'; +import { + decodeNexusOperationCancellationState, + decodeNexusOperationExecutionStatus, + decodePendingNexusOperationState, + encodeNexusOperationIdConflictPolicy, + encodeNexusOperationIdReusePolicy, +} from './nexus-types'; +import type { + DescribeNexusOperationOptions, + GetNexusOperationHandleOptions, + ListNexusOperationsOptions, + NexusOperationExecutionCancellationInfo, + NexusOperationExecutionCount, + NexusOperationExecutionDescription, + NexusOperationExecution, + RawNexusOperationExecutionCancellationInfo, + RawNexusOperationExecutionInfo, + RawNexusOperationExecutionListInfo, + StartNexusOperationOptions, +} from './nexus-types'; + +export interface NexusClientOptions extends BaseClientOptions { + /** + * Used to override and extend default client behavior. + */ + interceptors?: NexusClientInterceptor[]; +} + +export type LoadedNexusClientOptions = LoadedWithDefaults; + +function defaultNexusClientOptions(): WithDefaults { + return { + ...defaultBaseClientOptions(), + interceptors: [], + }; +} + +/** + * Handle to a standalone Nexus operation execution. + * + * Use this to poll for results, describe, cancel, or terminate the operation. + * + * @experimental Nexus Standalone Operations are experimental. + */ +export interface NexusOperationHandle { + readonly operationId: string; + readonly runId?: string; + readonly client: NexusClient; + + /** + * Wait for the operation to complete and return its result. + * + * @throws {@link NexusOperationFailureError} if the operation completes with a failure outcome. + * + * @experimental Nexus Standalone Operations are experimental. + */ + result(): Promise; + + /** + * Describe the Nexus operation execution. + * + * @experimental Nexus Standalone Operations are experimental. + */ + describe(options?: DescribeNexusOperationOptions): Promise; + + /** + * Request cancellation of the operation. + * + * @param reason optional reason for the cancellation. + * + * @experimental Nexus Standalone Operations are experimental. + */ + cancel(reason?: string): Promise; + + /** + * Terminate the Nexus operation execution immediately. + * + * @experimental Nexus Standalone Operations are experimental. + */ + terminate(reason?: string): Promise; +} + +type OperationInputOf = Op extends nexus.OperationDefinition ? I : unknown; +type OperationOutputOf = Op extends nexus.OperationDefinition ? O : unknown; + +/** + * Typed service client for a specific Nexus service + endpoint pair. + * + * Created via {@link NexusClient.createServiceClient}. Provides type-safe + * {@link startOperation} and {@link executeOperation} based on the service definition's operation types. + * + * @experimental Nexus Standalone Operations are experimental. + */ +export interface NexusServiceClient { + readonly endpoint: string; + readonly service: T; + + /** + * Start a Nexus operation and return a handle. + * + * @experimental Nexus Standalone Operations are experimental. + */ + startOperation( + operation: Op, + input: OperationInputOf, + options: StartNexusOperationOptions + ): Promise>>; + startOperation( + operationKey: K, + input: OperationInputOf, + options: StartNexusOperationOptions + ): Promise>>; + + /** + * Start a Nexus operation and wait for its result. + * + * Convenience for {@link startOperation} followed by {@link NexusOperationHandle.result}. + * + * @experimental Nexus Standalone Operations are experimental. + */ + executeOperation( + operation: Op, + input: OperationInputOf, + options: StartNexusOperationOptions + ): Promise>; + executeOperation( + operationKey: K, + input: OperationInputOf, + options: StartNexusOperationOptions + ): Promise>; +} + +type CachedOperationResult = + | { kind: 'pending' } + | { kind: 'complete'; value: T } + | { kind: 'failed'; failure: NexusOperationFailureError }; + +/** + * Client for standalone Nexus operations. Access via {@link Client.nexus}. + * + * Use {@link createServiceClient} to get a typed service client, or call the namespace-wide + * {@link list}, {@link count}, and {@link getHandle} methods directly. + * + * @see {@link Client} + * + * @experimental Nexus Standalone Operations are experimental. + */ +export class NexusClient extends BaseClient { + public readonly options: LoadedNexusClientOptions; + protected readonly interceptors: NexusClientInterceptor[]; + + constructor(options?: NexusClientOptions) { + super(options); + this.options = { + ...defaultNexusClientOptions(), + ...filterNullAndUndefined(options ?? {}), + loadedDataConverter: this.dataConverter, + }; + this.interceptors = this.options.interceptors; + } + + /** + * Create a typed service client for starting and executing Nexus operations on a specific endpoint + service. + * + * @experimental Nexus Standalone Operations are experimental. + */ + public createServiceClient(options: { + endpoint: string; + service: T; + }): NexusServiceClient { + const { endpoint, service } = options; + + const startOperation = async ( + operation: unknown, + input: unknown, + options: StartNexusOperationOptions + ): Promise> => { + const operationName = this._resolveOperationName(operation, service); + return await this._startNexusOperation({ + endpoint, + service: service.name, + operation: operationName, + arg: input, + id: options.id, + scheduleToCloseTimeout: options.scheduleToCloseTimeout, + summary: options.summary, + idReusePolicy: options.idReusePolicy, + idConflictPolicy: options.idConflictPolicy, + searchAttributes: options.searchAttributes, + headers: options.headers, + }); + }; + + const executeOperation = async ( + operation: unknown, + input: unknown, + options: StartNexusOperationOptions + ): Promise => { + const handle = await startOperation(operation, input, options); + return await handle.result(); + }; + + return { + endpoint: options.endpoint, + service: options.service, + + // need to cast from the widened signature + // to the typesafe signature of the interface + startOperation: startOperation as NexusServiceClient['startOperation'], + executeOperation: executeOperation as NexusServiceClient['executeOperation'], + }; + } + + /** + * Get a handle to an existing standalone Nexus operation. + * + * If {@link GetNexusOperationHandleOptions.runId} is not provided, operations on the handle + * (like {@link NexusOperationHandle.result}) will target the latest run. + * + * @experimental Nexus Standalone Operations are experimental. + */ + public getHandle( + operationId: string, + options?: GetNexusOperationHandleOptions + ): NexusOperationHandle ? T : O> { + return this._createNexusOperationHandle({ + operationId, + runId: options?.runId, + }); + } + + /** + * List standalone Nexus operations matching a visibility query. + * + * @experimental Nexus Standalone Operations are experimental. + */ + public list(options?: ListNexusOperationsOptions): AsyncIterable { + const input: ListNexusOperationsInput = { + query: options?.query, + pageSize: options?.pageSize, + }; + const next = this._listHandler.bind(this); + const list = composeInterceptors(this.interceptors, 'list', next); + return list(input); + } + + /** + * Count standalone Nexus operations matching a visibility query. + * + * @experimental Nexus Standalone Operations are experimental. + */ + public async count(query?: string): Promise { + const input: CountNexusOperationsInput = { query }; + const next = this._countHandler.bind(this); + const count = composeInterceptors(this.interceptors, 'count', next); + return await count(input); + } + + protected async _startNexusOperation(input: StartNexusOperationInput): Promise> { + const next = this._startNexusOperationHandler.bind(this); + const start = composeInterceptors(this.interceptors, 'startOperation', next); + return await start(input); + } + + protected async _getNexusOperationResult(input: GetNexusOperationResultInput): Promise { + const next = this._getResultHandler.bind(this); + const get = composeInterceptors(this.interceptors, 'getResult', next); + return await get(input); + } + + protected async _describeNexusOperation( + input: DescribeNexusOperationInput + ): Promise { + const next = this._describeHandler.bind(this); + const describe = composeInterceptors(this.interceptors, 'describe', next); + return await describe(input); + } + + protected async _cancelNexusOperation(input: CancelNexusOperationInput): Promise { + const next = this._cancelHandler.bind(this); + const cancel = composeInterceptors(this.interceptors, 'cancel', next); + await cancel(input); + } + + protected async _terminateNexusOperation(input: TerminateNexusOperationInput): Promise { + const next = this._terminateHandler.bind(this); + const terminate = composeInterceptors(this.interceptors, 'terminate', next); + await terminate(input); + } + + protected async _startNexusOperationHandler(input: StartNexusOperationInput): Promise> { + const inputPayload = await encodeToPayload(this.dataConverter, input.arg); + const searchAttributes = + input.searchAttributes != null + ? { indexedFields: encodeUnifiedSearchAttributes(undefined, input.searchAttributes) } + : undefined; + const userMetadata = + input.summary != null + ? { + summary: await encodeToPayload(this.dataConverter, input.summary), + } + : undefined; + const req: temporal.api.workflowservice.v1.IStartNexusOperationExecutionRequest = { + namespace: this.options.namespace, + identity: this.options.identity, + requestId: uuid4(), + operationId: input.id, + endpoint: input.endpoint, + service: input.service, + operation: input.operation, + scheduleToCloseTimeout: msOptionalToTs(input.scheduleToCloseTimeout), + input: inputPayload, + idReusePolicy: input.idReusePolicy != null ? encodeNexusOperationIdReusePolicy(input.idReusePolicy) : undefined, + idConflictPolicy: + input.idConflictPolicy != null ? encodeNexusOperationIdConflictPolicy(input.idConflictPolicy) : undefined, + searchAttributes, + nexusHeader: input.headers ?? {}, + userMetadata, + }; + let res: temporal.api.workflowservice.v1.IStartNexusOperationExecutionResponse; + try { + res = await this.connection.workflowService.startNexusOperationExecution(req); + } catch (err: unknown) { + this._handleStartError(err, input.id); + } + return this._createNexusOperationHandle({ + operationId: input.id, + runId: res.runId ?? undefined, + }); + } + + protected _createNexusOperationHandle(opts: { operationId: string; runId?: string }): NexusOperationHandle { + let cachedResult: CachedOperationResult = { kind: 'pending' }; + return { + operationId: opts.operationId, + runId: opts.runId, + client: this, + async result(): Promise { + switch (cachedResult.kind) { + case 'pending': { + try { + const result = await this.client._getNexusOperationResult({ + operationId: this.operationId, + runId: this.runId, + }); + cachedResult = { kind: 'complete', value: result as O }; + return cachedResult.value; + } catch (err) { + if (err instanceof NexusOperationFailureError) { + cachedResult = { kind: 'failed', failure: err }; + } + throw err; + } + } + case 'complete': + return cachedResult.value; + case 'failed': + throw cachedResult.failure; + } + }, + async describe(options?: DescribeNexusOperationOptions): Promise { + return await this.client._describeNexusOperation({ + operationId: this.operationId, + runId: this.runId, + longPollToken: options?.longPollToken, + }); + }, + async cancel(reason?: string): Promise { + return await this.client._cancelNexusOperation({ + operationId: this.operationId, + runId: this.runId, + reason, + }); + }, + async terminate(reason?: string): Promise { + return await this.client._terminateNexusOperation({ + operationId: this.operationId, + runId: this.runId, + reason, + }); + }, + }; + } + + protected _resolveOperationName(operation: unknown, service: nexus.ServiceDefinition): string { + if (typeof operation === 'string') return operation; + if (operation && typeof operation === 'object' && 'name' in operation) { + return (operation as nexus.OperationDefinition).name; + } + // Resolve by reference against the service's operations + for (const [k, v] of Object.entries(service.operations)) { + if (v === operation) return v.name ?? k; + } + throw new TypeError('Unable to resolve Nexus operation name'); + } + + protected async _getResultHandler(input: GetNexusOperationResultInput): Promise { + const req: temporal.api.workflowservice.v1.IPollNexusOperationExecutionRequest = { + namespace: this.options.namespace, + operationId: input.operationId, + runId: input.runId ?? '', + waitStage: temporal.api.enums.v1.NexusOperationWaitStage.NEXUS_OPERATION_WAIT_STAGE_CLOSED, + }; + for (;;) { + let res: temporal.api.workflowservice.v1.IPollNexusOperationExecutionResponse; + try { + res = await this.connection.workflowService.pollNexusOperationExecution(req); + } catch (err: unknown) { + if (isGrpcServiceError(err) && err.code === grpcStatus.DEADLINE_EXCEEDED) { + // Long-poll timed out without a terminal state. Retry. + continue; + } + this.rethrowGrpcError(err, 'Failed to poll Nexus operation result', input.operationId); + } + + // The operation is closed if we have a result or failure + if (res.result) { + return await decodeFromPayloadsAtIndex(this.dataConverter, 0, [res.result]); + } + if (res.failure) { + const cause = await decodeOptionalFailureToOptionalError(this.dataConverter, res.failure); + throw new NexusOperationFailureError( + `Nexus operation failed: ${res.failure.message ?? 'unknown failure'}`, + cause ?? new Error(res.failure.message ?? 'unknown failure') + ); + } + + // Wait stage may be reported as less than CLOSED if long-poll returned early; + // retry the poll. + continue; + } + } + + protected async _describeHandler(input: DescribeNexusOperationInput): Promise { + const req: temporal.api.workflowservice.v1.IDescribeNexusOperationExecutionRequest = { + namespace: this.options.namespace, + operationId: input.operationId, + runId: input.runId ?? '', + longPollToken: input.longPollToken, + }; + let res: temporal.api.workflowservice.v1.IDescribeNexusOperationExecutionResponse; + try { + res = await this.connection.workflowService.describeNexusOperationExecution(req); + } catch (err: unknown) { + this.rethrowGrpcError(err, 'Failed to describe Nexus operation', input.operationId); + } + if (!res.info) { + throw new ServiceError('Received invalid Nexus operation description from server: missing info'); + } + return await nexusOperationExecutionDescriptionFromProto( + res.info, + this.dataConverter, + res.longPollToken ?? undefined + ); + } + + protected async _cancelHandler(input: CancelNexusOperationInput): Promise { + const req: temporal.api.workflowservice.v1.IRequestCancelNexusOperationExecutionRequest = { + namespace: this.options.namespace, + operationId: input.operationId, + runId: input.runId ?? '', + identity: this.options.identity, + requestId: uuid4(), + reason: input.reason, + }; + try { + await this.connection.workflowService.requestCancelNexusOperationExecution(req); + } catch (err: unknown) { + this.rethrowGrpcError(err, 'Failed to cancel Nexus operation', input.operationId); + } + } + + protected async _terminateHandler(input: TerminateNexusOperationInput): Promise { + const req: temporal.api.workflowservice.v1.ITerminateNexusOperationExecutionRequest = { + namespace: this.options.namespace, + operationId: input.operationId, + runId: input.runId ?? '', + identity: this.options.identity, + requestId: uuid4(), + reason: input.reason, + }; + try { + await this.connection.workflowService.terminateNexusOperationExecution(req); + } catch (err: unknown) { + this.rethrowGrpcError(err, 'Failed to terminate Nexus operation', input.operationId); + } + } + + protected async *_listHandler(input: ListNexusOperationsInput): AsyncIterable { + let nextPageToken: Uint8Array | undefined = undefined; + for (;;) { + let response: temporal.api.workflowservice.v1.IListNexusOperationExecutionsResponse; + try { + response = await this.connection.workflowService.listNexusOperationExecutions({ + namespace: this.options.namespace, + query: input.query, + pageSize: input.pageSize, + nextPageToken, + }); + } catch (err: unknown) { + this.rethrowGrpcError(err, 'Failed to list Nexus operations', undefined); + } + for (const raw of response.operations ?? []) { + yield nexusOperationListInfoFromProto(raw); + } + if (response.nextPageToken == null || response.nextPageToken.length === 0) break; + nextPageToken = response.nextPageToken; + } + } + + protected async _countHandler(input: CountNexusOperationsInput): Promise { + try { + const res = await this.connection.workflowService.countNexusOperationExecutions({ + namespace: this.options.namespace, + query: input.query, + }); + return nexusCountFromProto(res); + } catch (err: unknown) { + this.rethrowGrpcError(err, 'Failed to count Nexus operations', undefined); + } + } + + /** + * Handle errors from StartNexusOperationExecution: unpack ALREADY_EXISTS details and throw + * {@link NexusOperationAlreadyStartedError} if applicable. + */ + protected _handleStartError(err: unknown, operationId: string): never { + if (isGrpcServiceError(err) && err.code === grpcStatus.ALREADY_EXISTS) { + throw new NexusOperationAlreadyStartedError(operationId, extractNexusOperationAlreadyStartedRunId(err)); + } + this.rethrowGrpcError(err, 'Failed to start Nexus operation', operationId); + } + + protected rethrowGrpcError(err: unknown, fallbackMessage: string, operationId?: string, runId?: string): never { + if (isGrpcServiceError(err)) { + rethrowKnownErrorTypes(err); + + if (err.code === grpcStatus.NOT_FOUND && operationId != null) { + throw new NexusOperationNotFoundError(operationId, runId); + } + + throw new ServiceError(fallbackMessage, { cause: err }); + } + + throw new ServiceError('Unexpected error while making gRPC request', { cause: err as Error }); + } +} + +async function cancellationInfoFromProto( + raw: RawNexusOperationExecutionCancellationInfo, + dataConverter: LoadedDataConverter +): Promise { + return { + requestedTime: optionalTsToDate(raw.requestedTime), + state: decodeNexusOperationCancellationState(raw.state), + attempt: raw.attempt ?? 0, + lastAttemptCompleteTime: optionalTsToDate(raw.lastAttemptCompleteTime), + nextAttemptScheduleTime: optionalTsToDate(raw.nextAttemptScheduleTime), + lastAttemptFailure: await decodeOptionalFailureToOptionalError(dataConverter, raw.lastAttemptFailure), + blockedReason: raw.blockedReason ?? undefined, + reason: raw.reason ?? '', + raw, + }; +} + +async function nexusOperationExecutionDescriptionFromProto( + raw: RawNexusOperationExecutionInfo, + dataConverter: LoadedDataConverter, + longPollToken: Uint8Array | undefined +): Promise { + let decodedMetadata: + | { state: 'resolved'; summary: string | undefined; details: string | undefined } + | { state: 'pending' } = { + state: 'pending', + }; + const decodeMetadata = async () => { + if (decodedMetadata.state === 'pending') { + decodedMetadata = { + state: 'resolved', + summary: (await decodeOptionalSinglePayload(dataConverter, raw.userMetadata?.summary)) ?? undefined, + details: (await decodeOptionalSinglePayload(dataConverter, raw.userMetadata?.details)) ?? undefined, + }; + } + return decodedMetadata; + }; + return { + operationId: raw.operationId ?? '', + runId: raw.runId ?? '', + endpoint: raw.endpoint ?? '', + service: raw.service ?? '', + operation: raw.operation ?? '', + status: decodeNexusOperationExecutionStatus(raw.status), + state: decodePendingNexusOperationState(raw.state), + scheduleToCloseTimeout: optionalTsToMs(raw.scheduleToCloseTimeout), + scheduleToStartTimeout: optionalTsToMs(raw.scheduleToStartTimeout), + startToCloseTimeout: optionalTsToMs(raw.startToCloseTimeout), + attempt: raw.attempt ?? 0, + scheduleTime: optionalTsToDate(raw.scheduleTime), + expirationTime: optionalTsToDate(raw.expirationTime), + closeTime: optionalTsToDate(raw.closeTime), + lastAttemptCompleteTime: optionalTsToDate(raw.lastAttemptCompleteTime), + lastAttemptFailure: await decodeOptionalFailureToOptionalError(dataConverter, raw.lastAttemptFailure), + nextAttemptScheduleTime: optionalTsToDate(raw.nextAttemptScheduleTime), + executionDuration: optionalTsToMs(raw.executionDuration), + cancellationInfo: raw.cancellationInfo + ? await cancellationInfoFromProto(raw.cancellationInfo, dataConverter) + : undefined, + blockedReason: raw.blockedReason ?? undefined, + requestId: raw.requestId ?? '', + operationToken: raw.operationToken ?? undefined, + stateTransitionCount: raw.stateTransitionCount?.toNumber() ?? 0, + searchAttributes: decodeTypedSearchAttributes(raw.searchAttributes?.indexedFields), + identity: raw.identity ?? '', + longPollToken, + raw, + staticDetails: async () => (await decodeMetadata()).details, + staticSummary: async () => (await decodeMetadata()).summary, + }; +} + +function nexusOperationListInfoFromProto(raw: RawNexusOperationExecutionListInfo): NexusOperationExecution { + return { + operationId: raw.operationId ?? '', + runId: raw.runId ?? '', + endpoint: raw.endpoint ?? '', + service: raw.service ?? '', + operation: raw.operation ?? '', + scheduleTime: optionalTsToDate(raw.scheduleTime), + closeTime: optionalTsToDate(raw.closeTime), + status: decodeNexusOperationExecutionStatus(raw.status), + searchAttributes: decodeTypedSearchAttributes(raw.searchAttributes?.indexedFields), + stateTransitionCount: raw.stateTransitionCount?.toNumber() ?? 0, + executionDuration: optionalTsToMs(raw.executionDuration), + raw, + }; +} + +function nexusCountFromProto( + raw: temporal.api.workflowservice.v1.ICountNexusOperationExecutionsResponse +): NexusOperationExecutionCount { + return { + count: raw.count?.toNumber() ?? 0, + groups: (raw.groups ?? []).map((group) => ({ + count: group.count?.toNumber() ?? 0, + groupValues: (group.groupValues ?? []).map((v) => searchAttributePayloadConverter.fromPayload(v)), + })), + }; +} + +/** + * Thrown by {@link NexusOperationHandle.result} when the operation completes with a failure outcome. + * The original failure is available on `cause`. + */ +@SymbolBasedInstanceOfError('NexusOperationFailureError') +export class NexusOperationFailureError extends Error { + public constructor( + message: string, + public readonly cause: Error + ) { + super(message, { cause }); + } +} + +/** + * Thrown by {@link NexusServiceClient.startOperation} when the server returns ALREADY_EXISTS + * because an operation with the given ID already exists (and the reuse/conflict policies disallow reuse. + */ +@SymbolBasedInstanceOfError('NexusOperationAlreadyStartedError') +export class NexusOperationAlreadyStartedError extends Error { + public constructor( + public readonly operationId: string, + public readonly runId?: string + ) { + super(`Nexus operation already started: operation_id=${operationId} run_id=${runId}`); + } +} + +/** + * Thrown when a Nexus Operation with the given operationId and runId is not known by the Temporal Server. + */ +@SymbolBasedInstanceOfError('NexusOperationNotFoundError') +export class NexusOperationNotFoundError extends Error { + public constructor( + public readonly operationId: string, + public readonly runId?: string + ) { + super(`Nexus operation not found: operation_id=${operationId} run_id=${runId}`); + } +} diff --git a/packages/client/src/nexus-types.ts b/packages/client/src/nexus-types.ts new file mode 100644 index 000000000..ece6e3e08 --- /dev/null +++ b/packages/client/src/nexus-types.ts @@ -0,0 +1,459 @@ +import type { Duration, SearchAttributePair, SearchAttributeType, TypedSearchAttributes } from '@temporalio/common'; +import type { TypedSearchAttributeValue } from '@temporalio/common/lib/search-attributes'; +import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow'; +import type { temporal } from '@temporalio/proto'; +import type { Replace } from '@temporalio/common/lib/type-helpers'; + +/** + * Defines whether to allow re-using an operation ID from a previously *completed* Nexus operation. + * + * See {@link NexusOperationIdConflictPolicy} for handling ID duplication with a *running* operation. + */ +export const NexusOperationIdReusePolicy = { + ALLOW_DUPLICATE: 'ALLOW_DUPLICATE', + ALLOW_DUPLICATE_FAILED_ONLY: 'ALLOW_DUPLICATE_FAILED_ONLY', + REJECT_DUPLICATE: 'REJECT_DUPLICATE', +} as const; +export type NexusOperationIdReusePolicy = + (typeof NexusOperationIdReusePolicy)[keyof typeof NexusOperationIdReusePolicy]; + +export const [encodeNexusOperationIdReusePolicy, decodeNexusOperationIdReusePolicy] = makeProtoEnumConverters< + temporal.api.enums.v1.NexusOperationIdReusePolicy, + typeof temporal.api.enums.v1.NexusOperationIdReusePolicy, + keyof typeof temporal.api.enums.v1.NexusOperationIdReusePolicy, + typeof NexusOperationIdReusePolicy, + 'NEXUS_OPERATION_ID_REUSE_POLICY_' +>( + { + [NexusOperationIdReusePolicy.ALLOW_DUPLICATE]: 1, + [NexusOperationIdReusePolicy.ALLOW_DUPLICATE_FAILED_ONLY]: 2, + [NexusOperationIdReusePolicy.REJECT_DUPLICATE]: 3, + UNSPECIFIED: 0, + } as const, + 'NEXUS_OPERATION_ID_REUSE_POLICY_' +); + +/** + * Defines how to resolve an operation ID conflict with a *running* Nexus operation. + * + * See {@link NexusOperationIdReusePolicy} for handling operation ID duplication with a *closed* operation. + */ +export const NexusOperationIdConflictPolicy = { + FAIL: 'FAIL', + USE_EXISTING: 'USE_EXISTING', +} as const; +export type NexusOperationIdConflictPolicy = + (typeof NexusOperationIdConflictPolicy)[keyof typeof NexusOperationIdConflictPolicy]; + +export const [encodeNexusOperationIdConflictPolicy, decodeNexusOperationIdConflictPolicy] = makeProtoEnumConverters< + temporal.api.enums.v1.NexusOperationIdConflictPolicy, + typeof temporal.api.enums.v1.NexusOperationIdConflictPolicy, + keyof typeof temporal.api.enums.v1.NexusOperationIdConflictPolicy, + typeof NexusOperationIdConflictPolicy, + 'NEXUS_OPERATION_ID_CONFLICT_POLICY_' +>( + { + [NexusOperationIdConflictPolicy.FAIL]: 1, + [NexusOperationIdConflictPolicy.USE_EXISTING]: 2, + UNSPECIFIED: 0, + } as const, + 'NEXUS_OPERATION_ID_CONFLICT_POLICY_' +); + +/** + * A general status for a Nexus operation, indicating whether it is currently running or in a terminal state. + */ +export const NexusOperationExecutionStatus = { + RUNNING: 'RUNNING', + COMPLETED: 'COMPLETED', + FAILED: 'FAILED', + CANCELED: 'CANCELED', + TERMINATED: 'TERMINATED', + TIMED_OUT: 'TIMED_OUT', +} as const; +export type NexusOperationExecutionStatus = + (typeof NexusOperationExecutionStatus)[keyof typeof NexusOperationExecutionStatus]; + +export const [encodeNexusOperationExecutionStatus, decodeNexusOperationExecutionStatus] = makeProtoEnumConverters< + temporal.api.enums.v1.NexusOperationExecutionStatus, + typeof temporal.api.enums.v1.NexusOperationExecutionStatus, + keyof typeof temporal.api.enums.v1.NexusOperationExecutionStatus, + typeof NexusOperationExecutionStatus, + 'NEXUS_OPERATION_EXECUTION_STATUS_' +>( + { + [NexusOperationExecutionStatus.RUNNING]: 1, + [NexusOperationExecutionStatus.COMPLETED]: 2, + [NexusOperationExecutionStatus.FAILED]: 3, + [NexusOperationExecutionStatus.CANCELED]: 4, + [NexusOperationExecutionStatus.TERMINATED]: 5, + [NexusOperationExecutionStatus.TIMED_OUT]: 6, + UNSPECIFIED: 0, + } as const, + 'NEXUS_OPERATION_EXECUTION_STATUS_' +); + +/** + * A more detailed breakdown of {@link NexusOperationExecutionStatus.RUNNING}. + */ +export const PendingNexusOperationState = { + SCHEDULED: 'SCHEDULED', + BACKING_OFF: 'BACKING_OFF', + STARTED: 'STARTED', + BLOCKED: 'BLOCKED', +} as const; +export type PendingNexusOperationState = (typeof PendingNexusOperationState)[keyof typeof PendingNexusOperationState]; + +export const [encodePendingNexusOperationState, decodePendingNexusOperationState] = makeProtoEnumConverters< + temporal.api.enums.v1.PendingNexusOperationState, + typeof temporal.api.enums.v1.PendingNexusOperationState, + keyof typeof temporal.api.enums.v1.PendingNexusOperationState, + typeof PendingNexusOperationState, + 'PENDING_NEXUS_OPERATION_STATE_' +>( + { + [PendingNexusOperationState.SCHEDULED]: 1, + [PendingNexusOperationState.BACKING_OFF]: 2, + [PendingNexusOperationState.STARTED]: 3, + [PendingNexusOperationState.BLOCKED]: 4, + UNSPECIFIED: 0, + } as const, + 'PENDING_NEXUS_OPERATION_STATE_' +); + +/** + * State of a Nexus operation cancellation. + */ +export const NexusOperationCancellationState = { + SCHEDULED: 'SCHEDULED', + BACKING_OFF: 'BACKING_OFF', + SUCCEEDED: 'SUCCEEDED', + FAILED: 'FAILED', + TIMED_OUT: 'TIMED_OUT', + BLOCKED: 'BLOCKED', +} as const; +export type NexusOperationCancellationState = + (typeof NexusOperationCancellationState)[keyof typeof NexusOperationCancellationState]; + +export const [encodeNexusOperationCancellationState, decodeNexusOperationCancellationState] = makeProtoEnumConverters< + temporal.api.enums.v1.NexusOperationCancellationState, + typeof temporal.api.enums.v1.NexusOperationCancellationState, + keyof typeof temporal.api.enums.v1.NexusOperationCancellationState, + typeof NexusOperationCancellationState, + 'NEXUS_OPERATION_CANCELLATION_STATE_' +>( + { + [NexusOperationCancellationState.SCHEDULED]: 1, + [NexusOperationCancellationState.BACKING_OFF]: 2, + [NexusOperationCancellationState.SUCCEEDED]: 3, + [NexusOperationCancellationState.FAILED]: 4, + [NexusOperationCancellationState.TIMED_OUT]: 5, + [NexusOperationCancellationState.BLOCKED]: 6, + UNSPECIFIED: 0, + } as const, + 'NEXUS_OPERATION_CANCELLATION_STATE_' +); + +export type RawNexusOperationExecutionInfo = temporal.api.nexus.v1.INexusOperationExecutionInfo; +export type RawNexusOperationExecutionListInfo = temporal.api.nexus.v1.INexusOperationExecutionListInfo; +export type RawNexusOperationExecutionCancellationInfo = temporal.api.nexus.v1.INexusOperationExecutionCancellationInfo; + +/** + * Cancellation state for a standalone Nexus operation. + */ +export interface NexusOperationExecutionCancellationInfo { + /** + * The time when cancellation was requested. + */ + readonly requestedTime: Date | undefined; + + /** + * The current state of the cancellation request. + */ + readonly state: NexusOperationCancellationState | undefined; + + /** + * The number of attempts made to deliver the cancel operation request. + */ + readonly attempt: number; + + /** + * The time when the last attempt completed. + */ + readonly lastAttemptCompleteTime: Date | undefined; + + /** + * The time when the next attempt is scheduled. + */ + readonly nextAttemptScheduleTime: Date | undefined; + + /** + * The last attempt's failure, if any. + */ + readonly lastAttemptFailure: Error | undefined; + + /** + * Blocked reason provides additional information if the cancellation state is {@link NexusOperationCancellationState.BLOCKED}. + */ + readonly blockedReason: string | undefined; + + /** + * The reason specified in the cancellation request. + */ + readonly reason: string; + + /** + * Underlying protobuf cancellation info. + */ + readonly raw: RawNexusOperationExecutionCancellationInfo; +} + +/** + * Info for a standalone Nexus operation execution, from list response. + */ +export interface NexusOperationExecution { + /** + * Unique identifier of this operation. + */ + readonly operationId: string; + + /** + * The run ID of the standalone Nexus operation. + */ + readonly runId: string; + + /** + * Endpoint name. + */ + readonly endpoint: string; + + /** + * Service name. + */ + readonly service: string; + + /** + * Operation name. + */ + readonly operation: string; + + /** + * The time the operation was originally scheduled. + */ + readonly scheduleTime: Date | undefined; + + /** + * Time the operation reached a terminal status, if closed. + */ + readonly closeTime: Date | undefined; + + /** + * Current status of the operation. + */ + readonly status: NexusOperationExecutionStatus | undefined; + + /** + * Current set of search attributes if any. + */ + readonly searchAttributes: TypedSearchAttributes; + + /** + * Number of state transitions. + */ + readonly stateTransitionCount: number; + + /** + * Duration from scheduled to close time, only populated if closed. + */ + readonly executionDuration: number | undefined; + + /** + * Underlying protobuf info. + */ + readonly raw: RawNexusOperationExecutionListInfo; +} + +/** + * Detailed information about a standalone Nexus operation execution. + */ +export type NexusOperationExecutionDescription = Replace< + NexusOperationExecution, + { + raw: temporal.api.nexus.v1.INexusOperationExecutionInfo; + } +> & { + /** + * More detailed breakdown if status is {@link NexusOperationExecutionStatus.RUNNING}. + */ + readonly state: PendingNexusOperationState | undefined; + + /** + * Schedule-to-close timeout for this operation in milliseconds. + */ + readonly scheduleToCloseTimeout: number | undefined; + + /** + * Schedule-to-start timeout for this operation in milliseconds. + */ + readonly scheduleToStartTimeout: number | undefined; + + /** + * Start-to-close timeout for this operation in milliseconds. + */ + readonly startToCloseTimeout: number | undefined; + + /** + * Current attempt number. + */ + readonly attempt: number; + + /** + * Scheduled time plus schedule_to_close_timeout. + */ + readonly expirationTime: Date | undefined; + + /** + * Time when the last attempt completed. + */ + readonly lastAttemptCompleteTime: Date | undefined; + + /** + * Time when the next attempt will be scheduled. + */ + readonly nextAttemptScheduleTime: Date | undefined; + + /** + * Failure from the last failed attempt, if any. + */ + readonly lastAttemptFailure: Error | undefined; + + /** + * Reason the operation is blocked, if any. + */ + readonly blockedReason: string | undefined; + + /** + * Server-generated request ID used as an idempotency token. + */ + readonly requestId: string; + + /** + * operationToken is only set for asynchronous operations after a successful start_operation call. + */ + readonly operationToken: string | undefined; + + /** + * Identity of the client that started this operation. + */ + readonly identity: string; + + /** + * Cancellation info if cancellation was requested. + */ + readonly cancellationInfo: NexusOperationExecutionCancellationInfo | undefined; + + /** + * Token for follow-on long-poll requests. None if the operation is complete. + */ + readonly longPollToken: Uint8Array | undefined; + + staticSummary: () => Promise; + staticDetails: () => Promise; +}; + +/** + * Count of standalone Nexus operation executions, optionally grouped by a search attribute. + */ +export interface NexusOperationExecutionCount { + readonly count: number; + readonly groups: readonly NexusOperationExecutionCountGroup[]; +} + +/** + * A group within a count aggregation. + */ +export interface NexusOperationExecutionCountGroup { + readonly count: number; + readonly groupValues: readonly TypedSearchAttributeValue[]; +} + +/** + * Options for starting a standalone Nexus operation via + * {@link NexusServiceClient.startOperation} or {@link NexusServiceClient.executeOperation}. + */ +export interface StartNexusOperationOptions { + /** + * Caller-side identifier for this operation. Must be unique among operations in the + * same namespace, subject to {@link idReusePolicy} and {@link idConflictPolicy}. + */ + id: string; + + /** + * Schedule-to-close timeout for this operation. + */ + scheduleToCloseTimeout?: Duration; + + /** + * Single-line Temporal-markdown summary used by user interfaces to display operation metadata. + */ + summary?: string; + + /** + * How to handle an operation ID that was used by a previously closed operation. + * @default {@link NexusOperationIdReusePolicy.ALLOW_DUPLICATE} + */ + idReusePolicy?: NexusOperationIdReusePolicy; + + /** + * How to handle an operation ID that is in use by a currently running operation. + * @default {@link NexusOperationIdConflictPolicy.FAIL} + */ + idConflictPolicy?: NexusOperationIdConflictPolicy; + + /** + * Search attributes for indexing. + */ + searchAttributes?: SearchAttributePair[] | TypedSearchAttributes; + + /** + * Headers to attach to the Nexus request. Transmitted to the handler as-is. + * Useful for propagating tracing information. + */ + headers?: Record; +} + +/** + * Options for {@link NexusClient.list}. + */ +export interface ListNexusOperationsOptions { + /** + * Visibility list filter query. See https://docs.temporal.io/list-filter for syntax. + */ + query?: string; + + /** + * Maximum number of operations to return per page. + */ + pageSize?: number; +} + +/** + * Options for {@link NexusClient.getHandle}. + */ +export interface GetNexusOperationHandleOptions { + /** + * If provided, targets this specific run of the operation. If absent, targets the latest run. + */ + runId?: string; +} + +/** + * Options for {@link NexusOperationHandle.describe} + */ +export interface DescribeNexusOperationOptions { + /** + * Token from a previous describe response. + * If provided, the request will long-poll until the Nexus Operation state changes. + */ + longPollToken?: Uint8Array; +} diff --git a/packages/core-bridge/sdk-core b/packages/core-bridge/sdk-core index 08adc4a43..1e3298a06 160000 --- a/packages/core-bridge/sdk-core +++ b/packages/core-bridge/sdk-core @@ -1 +1 @@ -Subproject commit 08adc4a43be357a214fe80ae64f877a0ca8c8ee2 +Subproject commit 1e3298a0657e521b37470c2530fe0bbb0f09377f diff --git a/packages/core-bridge/src/metrics.rs b/packages/core-bridge/src/metrics.rs index 4e649d6e1..e1ee7ddd2 100644 --- a/packages/core-bridge/src/metrics.rs +++ b/packages/core-bridge/src/metrics.rs @@ -48,7 +48,10 @@ pub fn init(cx: &mut neon::prelude::ModuleContext) -> neon::prelude::NeonResult< )?; cx.export_function("setMetricGaugeValue", set_metric_gauge_value)?; cx.export_function("setMetricGaugeF64Value", set_metric_gauge_f64_value)?; - cx.export_function("addMetricUpDownCounterValue", add_metric_up_down_counter_value)?; + cx.export_function( + "addMetricUpDownCounterValue", + add_metric_up_down_counter_value, + )?; Ok(()) } diff --git a/packages/test/src/test-nexus-standalone-types.ts b/packages/test/src/test-nexus-standalone-types.ts new file mode 100644 index 000000000..e761f702c --- /dev/null +++ b/packages/test/src/test-nexus-standalone-types.ts @@ -0,0 +1,206 @@ +import test from 'ava'; +import * as nexus from 'nexus-rpc'; +import type { Client, NexusOperationHandle } from '@temporalio/client'; + +interface MyInput { + value: string; +} + +interface MyOutput { + result: string; +} + +const myService = nexus.service('myService', { + mySyncOp: nexus.operation(), + myOtherOp: nexus.operation(), +}); + +const otherService = nexus.service('otherService', { + stringOp: nexus.operation(), +}); + +declare const client: Client; + +test('executeOperation with operation definition infers output type', async (t) => { + async function _assertion() { + const nexusClient = client.nexus.createServiceClient({ + endpoint: 'my-endpoint', + service: myService, + }); + + const _output: MyOutput = await nexusClient.executeOperation( + myService.operations.mySyncOp, + { value: 'hello' }, + { id: 'op-1', scheduleToCloseTimeout: '10s' } + ); + } + t.pass(); +}); + +test('executeOperation with key-based lookup infers output type', async (t) => { + async function _assertion() { + const nexusClient = client.nexus.createServiceClient({ + endpoint: 'my-endpoint', + service: myService, + }); + + const _output: MyOutput = await nexusClient.executeOperation( + 'mySyncOp', + { value: 'hello' }, + { id: 'op-1', scheduleToCloseTimeout: '10s' } + ); + } + t.pass(); +}); + +test('startOperation + handle.result() preserves type', async (t) => { + async function _assertion() { + const nexusClient = client.nexus.createServiceClient({ + endpoint: 'my-endpoint', + service: myService, + }); + + const _handle: NexusOperationHandle = await nexusClient.startOperation( + myService.operations.mySyncOp, + { value: 'hello' }, + { id: 'op-1', scheduleToCloseTimeout: '10s' } + ); + const _handleOutput: MyOutput = await _handle.result(); + } + t.pass(); +}); + +test('Different operation in the same service infers different types', async (t) => { + async function _assertion() { + const nexusClient = client.nexus.createServiceClient({ + endpoint: 'my-endpoint', + service: myService, + }); + + const _numberOutput: number = await nexusClient.executeOperation(myService.operations.myOtherOp, 'input-string', { + id: 'op-1', + scheduleToCloseTimeout: '10s', + }); + } + t.pass(); +}); + +test('getHandle with string defaults to unknown', (t) => { + function _assertion() { + const _anyHandle: NexusOperationHandle = client.nexus.getHandle('op-1'); + } + t.pass(); +}); + +test('getHandle with generic type parameter infers correctly', async (t) => { + async function _assertion() { + const _typedHandle: NexusOperationHandle = client.nexus.getHandle('op-1'); + const _typedOutput: MyOutput = await _typedHandle.result(); + + const _typedHandleFromOp: NexusOperationHandle = + client.nexus.getHandle('op-1'); + const _typedOutputFromOp: MyOutput = await _typedHandleFromOp.result(); + } + t.pass(); +}); + +test('executeOperation with wrong input type produces type error', async (t) => { + async function _assertion() { + const nexusClient = client.nexus.createServiceClient({ + endpoint: 'my-endpoint', + service: myService, + }); + + // @ts-expect-error - input must be MyInput, not string + await nexusClient.executeOperation(myService.operations.mySyncOp, 'wrong-input-type', { + id: 'op-1', + scheduleToCloseTimeout: '10s', + }); + } + t.pass(); +}); + +test('startOperation with wrong input type produces type error', async (t) => { + async function _assertion() { + const nexusClient = client.nexus.createServiceClient({ + endpoint: 'my-endpoint', + service: myService, + }); + + // @ts-expect-error - input must be MyInput, not string + await nexusClient.startOperation(myService.operations.mySyncOp, 'wrong-input-type', { + id: 'op-1', + scheduleToCloseTimeout: '10s', + }); + } + t.pass(); +}); + +test('Operation from a different service produces type error', async (t) => { + async function _assertion() { + const nexusClient = client.nexus.createServiceClient({ + endpoint: 'my-endpoint', + service: myService, + }); + + // @ts-expect-error - otherService.stringOp is not an operation of myService + await nexusClient.executeOperation(otherService.operations.stringOp, 'hello', { + id: 'op-1', + scheduleToCloseTimeout: '10s', + }); + } + t.pass(); +}); + +test('Mismatched result type on handle produces type error', async (t) => { + async function _assertion() { + const nexusClient = client.nexus.createServiceClient({ + endpoint: 'my-endpoint', + service: myService, + }); + + // @ts-expect-error - Type 'NexusOperationHandle' not assignable to 'NexusOperationHandle' + const _badHandle: NexusOperationHandle = await nexusClient.startOperation( + myService.operations.mySyncOp, + { value: 'hello' }, + { id: 'op-1', scheduleToCloseTimeout: '10s' } + ); + } + t.pass(); +}); + +test('Mismatched output type on execute produces type error', async (t) => { + async function _assertion() { + const nexusClient = client.nexus.createServiceClient({ + endpoint: 'my-endpoint', + service: myService, + }); + + // @ts-expect-error - Type 'MyOutput' not assignable to 'string' + const _badOutput: string = await nexusClient.executeOperation( + myService.operations.mySyncOp, + { value: 'hello' }, + { id: 'op-1', scheduleToCloseTimeout: '10s' } + ); + } + t.pass(); +}); + +test('Missing required id option produces type error', async (t) => { + async function _assertion() { + const nexusClient = client.nexus.createServiceClient({ + endpoint: 'my-endpoint', + service: myService, + }); + + // @ts-expect-error - id is required + await nexusClient.executeOperation( + myService.operations.mySyncOp, + { value: 'hello' }, + { + scheduleToCloseTimeout: '10s', + } + ); + } + t.pass(); +}); diff --git a/packages/test/src/test-nexus-standalone.ts b/packages/test/src/test-nexus-standalone.ts new file mode 100644 index 000000000..3b32bbfa2 --- /dev/null +++ b/packages/test/src/test-nexus-standalone.ts @@ -0,0 +1,459 @@ +import { randomUUID } from 'crypto'; +import * as nexus from 'nexus-rpc'; +import { + Client, + NexusOperationAlreadyStartedError, + NexusOperationFailureError, + NexusOperationIdConflictPolicy, + NexusOperationIdReusePolicy, + type NexusClientInterceptor, + type StartNexusOperationInput, + type GetNexusOperationResultInput, + type DescribeNexusOperationInput, + type CancelNexusOperationInput, + type TerminateNexusOperationInput, + type ListNexusOperationsInput, + type CountNexusOperationsInput, +} from '@temporalio/client'; +import * as temporalnexus from '@temporalio/nexus'; +import * as workflow from '@temporalio/workflow'; +import { CancelledFailure, TerminatedFailure, ApplicationFailure, NexusOperationFailure } from '@temporalio/common'; +import { helpers, makeTestFunction } from './helpers-integration'; + +const test = makeTestFunction({ workflowsPath: __filename }); + +export const unblockEcho = workflow.defineUpdate('unblockEcho'); + +export async function blockingEcho(input: string): Promise { + let unblocked = false; + + workflow.setHandler(unblockEcho, () => { + unblocked = true; + }); + + await workflow.condition(() => unblocked); + + return input; +} + +const testService = nexus.service('testService', { + echo: nexus.operation(), + fail: nexus.operation<'application' | 'handler', string>(), + blockingAsync: nexus.operation<{ echo: string; wfId?: string }, string>(), +}); + +// Creates a test handler and a promise that can be used to wait until the workflow +// started by blockingAsync has been started. +function makeTestHandler() { + let markWorkflowStarted: () => void; + const workflowStarted = new Promise((resolve) => { + markWorkflowStarted = resolve; + }); + return { + workflowStarted, + handler: nexus.serviceHandler(testService, { + async echo(_ctx, input) { + return { value: input }; + }, + async fail(_ctx, input) { + switch (input) { + case 'application': + throw ApplicationFailure.create({ + message: 'test-application-failure', + nonRetryable: true, + }); + case 'handler': + throw new nexus.HandlerError(nexus.HandlerErrorType.INTERNAL, 'test-error', { retryableOverride: false }); + } + }, + blockingAsync: new temporalnexus.WorkflowRunOperationHandler<{ echo: string; wfId?: string }, string>( + async (ctx, input) => { + const handle = await temporalnexus.startWorkflow(ctx, blockingEcho, { + workflowId: input.wfId ?? randomUUID(), + args: [input.echo], + }); + markWorkflowStarted(); + return handle; + } + ), + }), + }; +} + +test('start sync operation and get result', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { handler } = makeTestHandler(); + const worker = await createWorker({ nexusServices: [handler] }); + + await worker.runUntil(async () => { + const svc = t.context.env.client.nexus.createServiceClient({ endpoint: endpointName, service: testService }); + const handle = await svc.startOperation(testService.operations.echo, 'hello', { + id: 'op-' + randomUUID(), + scheduleToCloseTimeout: '10s', + }); + t.is(typeof handle.operationId, 'string'); + const result = await handle.result(); + t.is(result.value, 'hello'); + }); +}); + +test('start async operation and poll result', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { handler } = makeTestHandler(); + const worker = await createWorker({ nexusServices: [handler] }); + + await worker.runUntil(async () => { + const svc = t.context.env.client.nexus.createServiceClient({ endpoint: endpointName, service: testService }); + const handle = await svc.startOperation(testService.operations.echo, 'async-hello', { + id: 'op-' + randomUUID(), + scheduleToCloseTimeout: '10s', + }); + const result = await handle.result(); + t.is(result.value, 'async-hello'); + }); +}); + +test('execute operation', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { handler } = makeTestHandler(); + const worker = await createWorker({ nexusServices: [handler] }); + + await worker.runUntil(async () => { + const svc = t.context.env.client.nexus.createServiceClient({ endpoint: endpointName, service: testService }); + const result = await svc.executeOperation('echo', 'execute-test', { + id: 'op-' + randomUUID(), + scheduleToCloseTimeout: '10s', + }); + t.is(result.value, 'execute-test'); + }); +}); + +test('describe operation', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { handler } = makeTestHandler(); + const worker = await createWorker({ nexusServices: [handler] }); + + await worker.runUntil(async () => { + const svc = t.context.env.client.nexus.createServiceClient({ endpoint: endpointName, service: testService }); + const handle = await svc.startOperation(testService.operations.echo, 'describe-test', { + id: 'op-' + randomUUID(), + scheduleToCloseTimeout: '10s', + summary: 'test-summary', + }); + const desc = await handle.describe(); + t.is(desc.operationId, handle.operationId); + t.is(desc.endpoint, endpointName); + t.is(desc.service, testService.name); + t.is(desc.operation, 'echo'); + t.truthy(desc.scheduleTime); + t.is(await desc.staticSummary(), 'test-summary'); + }); +}); + +test('cancel operation', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { handler } = makeTestHandler(); + const worker = await createWorker({ nexusServices: [handler] }); + + await worker.runUntil(async () => { + const svc = t.context.env.client.nexus.createServiceClient({ endpoint: endpointName, service: testService }); + const handle = await svc.startOperation( + testService.operations.blockingAsync, + { echo: 'cancel-test' }, + { + id: 'op-' + randomUUID(), + scheduleToCloseTimeout: '60s', + } + ); + await t.notThrowsAsync(handle.cancel('test cancellation')); + const err = await t.throwsAsync(handle.result(), { instanceOf: NexusOperationFailureError }); + t.true(err?.cause instanceof CancelledFailure); + }); +}); + +test('terminate operation', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { handler } = makeTestHandler(); + const worker = await createWorker({ nexusServices: [handler] }); + + await worker.runUntil(async () => { + const svc = t.context.env.client.nexus.createServiceClient({ endpoint: endpointName, service: testService }); + const handle = await svc.startOperation( + testService.operations.blockingAsync, + { echo: 'cancel-test' }, + { + id: 'op-' + randomUUID(), + scheduleToCloseTimeout: '60s', + } + ); + await t.notThrowsAsync(handle.terminate('test termination')); + const err = await t.throwsAsync(handle.result(), { instanceOf: NexusOperationFailureError }); + t.true(err?.cause instanceof TerminatedFailure); + }); +}); + +test('list operations', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { handler } = makeTestHandler(); + const worker = await createWorker({ nexusServices: [handler] }); + + await worker.runUntil(async () => { + const { client } = t.context.env; + const svc = client.nexus.createServiceClient({ endpoint: endpointName, service: testService }); + const opIds = new Set(); + for (let i = 0; i < 3; i++) { + const id = 'list-op-' + randomUUID(); + await svc.startOperation(testService.operations.echo, `list-${i}`, { + id, + scheduleToCloseTimeout: '10s', + }); + opIds.add(id); + } + const seen = new Set(); + for await (const op of client.nexus.list({ query: `Endpoint="${endpointName}"` })) { + seen.add(op.operationId); + if (seen.size >= 3) break; + } + let intersection = 0; + for (const id of opIds) if (seen.has(id)) intersection++; + t.true(intersection >= 1, `expected at least 1 of ${opIds.size} operations in list, saw ${intersection}`); + }); +}); + +test('count operations', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { handler } = makeTestHandler(); + const worker = await createWorker({ nexusServices: [handler] }); + + await worker.runUntil(async () => { + const { client } = t.context.env; + const svc = client.nexus.createServiceClient({ endpoint: endpointName, service: testService }); + await svc.startOperation(testService.operations.echo, 'count-test', { + id: 'count-op-' + randomUUID(), + scheduleToCloseTimeout: '10s', + }); + const result = await client.nexus.count(`Endpoint="${endpointName}"`); + t.true(result.count >= 1); + }); +}); + +test('get handle by ID', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { handler } = makeTestHandler(); + const worker = await createWorker({ nexusServices: [handler] }); + + await worker.runUntil(async () => { + const { client } = t.context.env; + const svc = client.nexus.createServiceClient({ endpoint: endpointName, service: testService }); + const id = 'handle-op-' + randomUUID(); + const handle1 = await svc.startOperation(testService.operations.echo, 'handle-test', { + id, + scheduleToCloseTimeout: '10s', + }); + const handle2 = client.nexus.getHandle(id, { runId: handle1.runId }); + const result = await handle2.result(); + t.is(result.value, 'handle-test'); + + // test that result() caches the value + const secondResult = await handle2.result(); + t.is(result, secondResult); + }); +}); + +test('ID conflict policy USE_EXISTING returns existing run', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { handler, workflowStarted } = makeTestHandler(); + const worker = await createWorker({ nexusServices: [handler] }); + + await worker.runUntil(async () => { + const svc = t.context.env.client.nexus.createServiceClient({ endpoint: endpointName, service: testService }); + const id = 'conflict-op-' + randomUUID(); + const wfId = `${id}-wf`; + const h1 = await svc.startOperation( + testService.operations.blockingAsync, + { echo: 'first', wfId }, + { + id, + scheduleToCloseTimeout: '60s', + } + ); + const h2 = await svc.startOperation( + testService.operations.blockingAsync, + { echo: 'first', wfId }, + { + id, + scheduleToCloseTimeout: '60s', + idConflictPolicy: NexusOperationIdConflictPolicy.USE_EXISTING, + } + ); + t.is(h1.operationId, h2.operationId); + t.is(h1.runId, h2.runId); + + // wait for workflow to start + await workflowStarted; + + // unblock workflow + const wfHandle = t.context.env.client.workflow.getHandle(wfId); + await wfHandle.executeUpdate(unblockEcho); + + // get result from both handles + const h1Result = await h1.result(); + const h2Result = await h2.result(); + + t.is(h1Result, 'first'); + t.is(h1Result, h2Result); + }); +}); + +test('ID reuse policy REJECT_DUPLICATE after close throws AlreadyStartedError', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { handler } = makeTestHandler(); + const worker = await createWorker({ nexusServices: [handler] }); + + await worker.runUntil(async () => { + const svc = t.context.env.client.nexus.createServiceClient({ endpoint: endpointName, service: testService }); + const id = 'reuse-op-' + randomUUID(); + const h1 = await svc.startOperation(testService.operations.echo, 'first', { + id, + scheduleToCloseTimeout: '10s', + }); + await h1.result(); + await t.throwsAsync( + svc.startOperation(testService.operations.echo, 'second', { + id, + scheduleToCloseTimeout: '10s', + idReusePolicy: NexusOperationIdReusePolicy.REJECT_DUPLICATE, + }), + { instanceOf: NexusOperationAlreadyStartedError } + ); + }); +}); + +test('failure propagation', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { handler } = makeTestHandler(); + const worker = await createWorker({ nexusServices: [handler] }); + + await worker.runUntil(async () => { + const svc = t.context.env.client.nexus.createServiceClient({ endpoint: endpointName, service: testService }); + const err = await t.throwsAsync( + svc.executeOperation(testService.operations.fail, 'application', { + id: 'fail-op-' + randomUUID(), + scheduleToCloseTimeout: '10s', + }), + { instanceOf: NexusOperationFailureError } + ); + t.true(err?.cause instanceof nexus.HandlerError); + t.true(err?.cause.cause instanceof ApplicationFailure); + + const handle = await svc.startOperation(testService.operations.fail, 'handler', { + id: 'fail-op-' + randomUUID(), + scheduleToCloseTimeout: '10s', + }); + const handleErr = await t.throwsAsync(handle.result(), { instanceOf: NexusOperationFailureError }); + t.true(handleErr?.cause instanceof nexus.HandlerError); + + // test that result() caches the failure + const secondHandleErr = await t.throwsAsync(handle.result(), { instanceOf: NexusOperationFailureError }); + t.is(handleErr, secondHandleErr); + }); +}); + +test('interceptor integration', async (t) => { + const { createWorker, registerNexusEndpoint } = helpers(t); + const { endpointName } = await registerNexusEndpoint(); + const { handler } = makeTestHandler(); + const worker = await createWorker({ nexusServices: [handler] }); + + await worker.runUntil(async () => { + const calls: string[] = []; + const interceptor: NexusClientInterceptor = { + async startOperation(input: StartNexusOperationInput, next) { + calls.push('start'); + return await next(input); + }, + async getResult(input: GetNexusOperationResultInput, next) { + calls.push('getResult'); + return await next(input); + }, + async describe(input: DescribeNexusOperationInput, next) { + calls.push('describe'); + return await next(input); + }, + async cancel(input: CancelNexusOperationInput, next) { + calls.push('cancel'); + return await next(input); + }, + async terminate(input: TerminateNexusOperationInput, next) { + calls.push('terminate'); + return await next(input); + }, + list(input: ListNexusOperationsInput, next) { + calls.push('list'); + return next(input); + }, + async count(input: CountNexusOperationsInput, next) { + calls.push('count'); + return await next(input); + }, + }; + + const { env } = t.context; + const client = new Client({ + connection: env.connection, + namespace: env.namespace, + interceptors: { nexus: [interceptor] }, + }); + const svc = client.nexus.createServiceClient({ endpoint: endpointName, service: testService }); + + const id = 'interceptor-op-' + randomUUID(); + const handle = await svc.startOperation(testService.operations.echo, 'hello', { + id, + scheduleToCloseTimeout: '10s', + }); + await handle.result(); + await handle.describe(); + + const handle2 = await svc.startOperation( + testService.operations.blockingAsync, + { echo: 'hello-2' }, + { + id: 'interceptor-op2-' + randomUUID(), + scheduleToCloseTimeout: '60s', + } + ); + await handle2.cancel('interceptor test'); + + const handle3 = await svc.startOperation( + testService.operations.blockingAsync, + { echo: 'hello-2' }, + { + id: 'interceptor-op3-' + randomUUID(), + scheduleToCloseTimeout: '60s', + } + ); + await handle3.terminate('interceptor test'); + for await (const _ of client.nexus.list({ query: `Endpoint="${endpointName}"` })) break; + await client.nexus.count(`Endpoint="${endpointName}"`); + + t.true(calls.includes('start'), 'start called'); + t.true(calls.includes('getResult'), 'getResult called'); + t.true(calls.includes('describe'), 'describe called'); + t.true(calls.includes('cancel'), 'cancel called'); + t.true(calls.includes('terminate'), 'terminate called'); + t.true(calls.includes('list'), 'list called'); + t.true(calls.includes('count'), 'count called'); + }); +}); From dbd43ed1fbd464f3343db6a1d246f78f2fa1505b Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 24 Apr 2026 13:07:31 -0700 Subject: [PATCH 2/4] remove hallucinated resultType --- packages/client/src/interceptors.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/client/src/interceptors.ts b/packages/client/src/interceptors.ts index 9bd7449e2..2e8e2ddfd 100644 --- a/packages/client/src/interceptors.ts +++ b/packages/client/src/interceptors.ts @@ -297,8 +297,6 @@ export interface StartNexusOperationInput { export interface GetNexusOperationResultInput { readonly operationId: string; readonly runId?: string; - /** Type hint for the deserialized result (used by interceptors/codecs). */ - readonly resultType?: unknown; } /** Input for {@link NexusClientInterceptor.describe}. */ From 414011d8c54ae42312b23f98be2b48adf306a640 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 24 Apr 2026 15:45:44 -0700 Subject: [PATCH 3/4] Address findings/suggestions from claude. Cache promise for getResult and decodeMetadata to avoid duplicate work. --- packages/client/src/nexus-client.ts | 54 ++++++++++------------ packages/test/src/test-nexus-standalone.ts | 25 +++++++--- 2 files changed, 44 insertions(+), 35 deletions(-) diff --git a/packages/client/src/nexus-client.ts b/packages/client/src/nexus-client.ts index f447c16cf..de75333b7 100644 --- a/packages/client/src/nexus-client.ts +++ b/packages/client/src/nexus-client.ts @@ -164,10 +164,7 @@ export interface NexusServiceClient { ): Promise>; } -type CachedOperationResult = - | { kind: 'pending' } - | { kind: 'complete'; value: T } - | { kind: 'failed'; failure: NexusOperationFailureError }; +type CachedOperationResult = { kind: 'pending' } | { kind: 'requested'; value: Promise }; /** * Client for standalone Nexus operations. Access via {@link Client.nexus}. @@ -372,24 +369,15 @@ export class NexusClient extends BaseClient { async result(): Promise { switch (cachedResult.kind) { case 'pending': { - try { - const result = await this.client._getNexusOperationResult({ - operationId: this.operationId, - runId: this.runId, - }); - cachedResult = { kind: 'complete', value: result as O }; - return cachedResult.value; - } catch (err) { - if (err instanceof NexusOperationFailureError) { - cachedResult = { kind: 'failed', failure: err }; - } - throw err; - } + const resultPromise = this.client._getNexusOperationResult({ + operationId: this.operationId, + runId: this.runId, + }) as Promise; + cachedResult = { kind: 'requested', value: resultPromise }; + return await cachedResult.value; } - case 'complete': - return cachedResult.value; - case 'failed': - throw cachedResult.failure; + case 'requested': + return await cachedResult.value; } }, async describe(options?: DescribeNexusOperationOptions): Promise { @@ -603,19 +591,27 @@ async function nexusOperationExecutionDescriptionFromProto( longPollToken: Uint8Array | undefined ): Promise { let decodedMetadata: - | { state: 'resolved'; summary: string | undefined; details: string | undefined } + | { state: 'requested'; metadata: Promise<{ summary: string | undefined; details: string | undefined }> } | { state: 'pending' } = { state: 'pending', }; const decodeMetadata = async () => { if (decodedMetadata.state === 'pending') { + const metadataPromise = Promise.all([ + decodeOptionalSinglePayload(dataConverter, raw.userMetadata?.summary), + decodeOptionalSinglePayload(dataConverter, raw.userMetadata?.details), + ]).then(([summary, details]) => { + return { + summary: (summary ?? undefined) as string | undefined, + details: (details ?? undefined) as string | undefined, + }; + }); decodedMetadata = { - state: 'resolved', - summary: (await decodeOptionalSinglePayload(dataConverter, raw.userMetadata?.summary)) ?? undefined, - details: (await decodeOptionalSinglePayload(dataConverter, raw.userMetadata?.details)) ?? undefined, + state: 'requested', + metadata: metadataPromise, }; } - return decodedMetadata; + return await decodedMetadata.metadata; }; return { operationId: raw.operationId ?? '', @@ -697,7 +693,7 @@ export class NexusOperationFailureError extends Error { /** * Thrown by {@link NexusServiceClient.startOperation} when the server returns ALREADY_EXISTS - * because an operation with the given ID already exists (and the reuse/conflict policies disallow reuse. + * because an operation with the given ID already exists and the reuse/conflict policies disallow reuse. */ @SymbolBasedInstanceOfError('NexusOperationAlreadyStartedError') export class NexusOperationAlreadyStartedError extends Error { @@ -705,7 +701,7 @@ export class NexusOperationAlreadyStartedError extends Error { public readonly operationId: string, public readonly runId?: string ) { - super(`Nexus operation already started: operation_id=${operationId} run_id=${runId}`); + super(`Nexus operation already started: operationId=${operationId} ${runId ? `runId=${runId}` : ''}`); } } @@ -718,6 +714,6 @@ export class NexusOperationNotFoundError extends Error { public readonly operationId: string, public readonly runId?: string ) { - super(`Nexus operation not found: operation_id=${operationId} run_id=${runId}`); + super(`Nexus operation not found: operationId=${operationId} ${runId ? `runId=${runId}` : ''}`); } } diff --git a/packages/test/src/test-nexus-standalone.ts b/packages/test/src/test-nexus-standalone.ts index 3b32bbfa2..dad28530b 100644 --- a/packages/test/src/test-nexus-standalone.ts +++ b/packages/test/src/test-nexus-standalone.ts @@ -101,17 +101,30 @@ test('start sync operation and get result', async (t) => { test('start async operation and poll result', async (t) => { const { createWorker, registerNexusEndpoint } = helpers(t); const { endpointName } = await registerNexusEndpoint(); - const { handler } = makeTestHandler(); + const { handler, workflowStarted } = makeTestHandler(); const worker = await createWorker({ nexusServices: [handler] }); await worker.runUntil(async () => { const svc = t.context.env.client.nexus.createServiceClient({ endpoint: endpointName, service: testService }); - const handle = await svc.startOperation(testService.operations.echo, 'async-hello', { - id: 'op-' + randomUUID(), - scheduleToCloseTimeout: '10s', - }); + const id = randomUUID(); + const handle = await svc.startOperation( + testService.operations.blockingAsync, + { echo: 'async-hello', wfId: id }, + { + id: `op-${id}`, + scheduleToCloseTimeout: '10s', + } + ); + + // wait for workflow to start + await workflowStarted; + + // unblock workflow + const wfHandle = t.context.env.client.workflow.getHandle(id); + await wfHandle.executeUpdate(unblockEcho); + const result = await handle.result(); - t.is(result.value, 'async-hello'); + t.is(result, 'async-hello'); }); }); From 48a714f97a131a0e16d5549299b45d92330591e3 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Fri, 24 Apr 2026 16:35:10 -0700 Subject: [PATCH 4/4] Small type cleanup --- packages/client/src/nexus-client.ts | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/packages/client/src/nexus-client.ts b/packages/client/src/nexus-client.ts index de75333b7..6caed2cba 100644 --- a/packages/client/src/nexus-client.ts +++ b/packages/client/src/nexus-client.ts @@ -164,7 +164,7 @@ export interface NexusServiceClient { ): Promise>; } -type CachedOperationResult = { kind: 'pending' } | { kind: 'requested'; value: Promise }; +type CachedPromise = { state: 'pending' } | { state: 'requested'; value: Promise }; /** * Client for standalone Nexus operations. Access via {@link Client.nexus}. @@ -361,19 +361,19 @@ export class NexusClient extends BaseClient { } protected _createNexusOperationHandle(opts: { operationId: string; runId?: string }): NexusOperationHandle { - let cachedResult: CachedOperationResult = { kind: 'pending' }; + let cachedResult: CachedPromise = { state: 'pending' }; return { operationId: opts.operationId, runId: opts.runId, client: this, async result(): Promise { - switch (cachedResult.kind) { + switch (cachedResult.state) { case 'pending': { const resultPromise = this.client._getNexusOperationResult({ operationId: this.operationId, runId: this.runId, }) as Promise; - cachedResult = { kind: 'requested', value: resultPromise }; + cachedResult = { state: 'requested', value: resultPromise }; return await cachedResult.value; } case 'requested': @@ -590,9 +590,7 @@ async function nexusOperationExecutionDescriptionFromProto( dataConverter: LoadedDataConverter, longPollToken: Uint8Array | undefined ): Promise { - let decodedMetadata: - | { state: 'requested'; metadata: Promise<{ summary: string | undefined; details: string | undefined }> } - | { state: 'pending' } = { + let decodedMetadata: CachedPromise<{ summary: string | undefined; details: string | undefined }> = { state: 'pending', }; const decodeMetadata = async () => { @@ -608,10 +606,10 @@ async function nexusOperationExecutionDescriptionFromProto( }); decodedMetadata = { state: 'requested', - metadata: metadataPromise, + value: metadataPromise, }; } - return await decodedMetadata.metadata; + return await decodedMetadata.value; }; return { operationId: raw.operationId ?? '',