Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"@temporalio/proto": "workspace:*",
"abort-controller": "^3.0.0",
"long": "^5.2.3",
"nexus-rpc": "^0.0.2",
"uuid": "^11.1.0"
},
"devDependencies": {
Expand Down
15 changes: 15 additions & 0 deletions packages/client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { AsyncCompletionClient } from './async-completion-client';
import type { BaseClientOptions, LoadedWithDefaults } from './base-client';
import { BaseClient, defaultBaseClientOptions } from './base-client';
import type { ClientInterceptors } from './interceptors';
import { NexusClient } from './nexus-client';
import { ScheduleClient } from './schedule-client';
import type { QueryRejectCondition, WorkflowService } from './types';
import { WorkflowClient } from './workflow-client';
Expand Down Expand Up @@ -62,6 +63,12 @@ export class Client extends BaseClient {
* @experimental The Worker Versioning API is still being designed. Major changes are expected.
*/
public readonly taskQueue: TaskQueueClient;
/**
* Nexus sub-client - use to start and interact with standalone Nexus operations.
*
* @experimental Standalone Nexus operations are a new API and susceptible to change.
*/
public readonly nexus: NexusClient;

constructor(options?: ClientOptions) {
options = options ?? {};
Expand Down Expand Up @@ -108,13 +115,21 @@ export class Client extends BaseClient {
dataConverter: this.dataConverter,
});

this.nexus = new NexusClient({
...commonOptions,
connection: this.connection,
dataConverter: this.dataConverter,
interceptors: interceptors?.nexus,
});

this.options = {
...defaultBaseClientOptions(),
...filterNullAndUndefined(commonOptions),
loadedDataConverter: this.dataConverter,
interceptors: {
workflow: this.workflow.options.interceptors,
schedule: this.schedule.options.interceptors,
nexus: this.nexus.options.interceptors,
},
workflow: {
queryRejectCondition: this.workflow.options.queryRejectCondition,
Expand Down
21 changes: 21 additions & 0 deletions packages/client/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,24 @@ function getGrpcStatusDetails(err: GrpcServiceError): google.rpc.Status['details
}
return google.rpc.Status.decode(statusBuffer).details;
}

/**
* Try to extract the runId field from a gRPC ALREADY_EXISTS error's details containing
* NexusOperationExecutionAlreadyStartedFailure. Returns undefined if the error details
* do not include this failure type or cannot be decoded.
*/
export function extractNexusOperationAlreadyStartedRunId(err: GrpcServiceError): string | undefined {
try {
for (const entry of getGrpcStatusDetails(err) ?? []) {
if (!entry.type_url || !entry.value) continue;
const type = entry.type_url.replace(/^type.googleapis.com\//, '') as ErrorDetailsName;
if (type !== 'temporal.api.errordetails.v1.NexusOperationExecutionAlreadyStartedFailure') continue;

const details = temporal.api.errordetails.v1.NexusOperationExecutionAlreadyStartedFailure.decode(entry.value);
return details.runId;
}
} catch {
// ignore
}
return undefined;
}
2 changes: 2 additions & 0 deletions packages/client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ export * from './workflow-options';
export * from './schedule-types';
export * from './schedule-client';
export * from './task-queue-client';
export * from './nexus-types';
export * from './nexus-client';
export { WorkflowUpdateStage } from './workflow-update-stage';
export {
WorkerBuildIdVersionSets,
Expand Down
98 changes: 96 additions & 2 deletions packages/client/src/interceptors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,17 @@
* @module
*/

import type { Duration, SearchAttributePair, TypedSearchAttributes } from '@temporalio/common';
import { Headers, Next } from '@temporalio/common';
import type { temporal } from '@temporalio/proto';
import type { NexusOperationHandle } from './nexus-client';
import type {
NexusOperationExecutionCount,
NexusOperationExecutionDescription,
NexusOperationExecution,
NexusOperationIdConflictPolicy,
NexusOperationIdReusePolicy,
} from './nexus-types';
import type { CompiledScheduleOptions } from './schedule-types';
import type {
DescribeWorkflowExecutionResponse,
Expand Down Expand Up @@ -237,14 +246,99 @@ export type CreateScheduleOutput = {
readonly conflictToken: Uint8Array;
};

/**
* Implement any of these methods to intercept NexusClient outbound calls.
*/
export interface NexusClientInterceptor {
/** Intercept a call to {@link NexusServiceClient.startOperation}. */
startOperation?: (
input: StartNexusOperationInput,
next: Next<this, 'startOperation'>
) => Promise<NexusOperationHandle<unknown>>;

/** Intercept {@link NexusOperationHandle.result}. */
getResult?: (input: GetNexusOperationResultInput, next: Next<this, 'getResult'>) => Promise<unknown>;

/** Intercept {@link NexusOperationHandle.describe}. */
describe?: (
input: DescribeNexusOperationInput,
next: Next<this, 'describe'>
) => Promise<NexusOperationExecutionDescription>;

/** Intercept {@link NexusOperationHandle.cancel}. */
cancel?: (input: CancelNexusOperationInput, next: Next<this, 'cancel'>) => Promise<void>;

/** Intercept {@link NexusOperationHandle.terminate}. */
terminate?: (input: TerminateNexusOperationInput, next: Next<this, 'terminate'>) => Promise<void>;

/** Intercept {@link NexusClient.list}. */
list?: (input: ListNexusOperationsInput, next: Next<this, 'list'>) => AsyncIterable<NexusOperationExecution>;

/** Intercept {@link NexusClient.count}. */
count?: (input: CountNexusOperationsInput, next: Next<this, 'count'>) => Promise<NexusOperationExecutionCount>;
}

/** Input for {@link NexusClientInterceptor.startOperation}. */
export interface StartNexusOperationInput {
readonly endpoint: string;
readonly service: string;
readonly operation: string;
readonly id: string;
readonly arg: unknown;
readonly scheduleToCloseTimeout?: Duration;
readonly summary?: string;
readonly idReusePolicy?: NexusOperationIdReusePolicy;
readonly idConflictPolicy?: NexusOperationIdConflictPolicy;
readonly searchAttributes?: SearchAttributePair[] | TypedSearchAttributes;
readonly headers?: Record<string, string>;
}

/** Input for {@link NexusClientInterceptor.getResult}. */
export interface GetNexusOperationResultInput {
readonly operationId: string;
readonly runId?: string;
}

/** Input for {@link NexusClientInterceptor.describe}. */
export interface DescribeNexusOperationInput {
readonly operationId: string;
readonly runId?: string;
readonly longPollToken?: Uint8Array;
}

/** Input for {@link NexusClientInterceptor.cancel}. */
export interface CancelNexusOperationInput {
readonly operationId: string;
readonly runId?: string;
readonly reason?: string;
}

/** Input for {@link NexusClientInterceptor.terminate}. */
export interface TerminateNexusOperationInput {
readonly operationId: string;
readonly runId?: string;
readonly reason?: string;
}

/** Input for {@link NexusClientInterceptor.list}. */
export interface ListNexusOperationsInput {
readonly query?: string;
readonly pageSize?: number;
}

/** Input for {@link NexusClientInterceptor.count}. */
export interface CountNexusOperationsInput {
readonly query?: string;
}

/**
* Interceptors for any high-level SDK client.
*
* NOTE: Currently only for {@link WorkflowClient} and {@link ScheduleClient}. More will be added later as needed.
*/
export interface ClientInterceptors {
// eslint-disable-next-line @typescript-eslint/no-deprecated
workflow?: WorkflowClientInterceptors | WorkflowClientInterceptor[];

schedule?: ScheduleClientInterceptor[];

nexus?: NexusClientInterceptor[];
}
Loading
Loading