Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 18 additions & 158 deletions packages/client/src/activity-client.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,13 @@
import { randomUUID } from 'node:crypto';
import { status as grpcStatus } from '@grpc/grpc-js';
import type {
ActivityFunction,
LoadedDataConverter,
Next,
Priority,
RetryPolicy,
SearchAttributePair,
TypedSearchAttributes,
} from '@temporalio/common';
import type { LoadedDataConverter, Next } from '@temporalio/common';
import {
compilePriority,
compileRetryPolicy,
convertDeploymentVersion,
decodePriority,
decompileRetryPolicy,
} from '@temporalio/common';
import type { Duration } from '@temporalio/common/lib/time';
import { msOptionalToTs, msToNumber, optionalTsToDate, optionalTsToMs } from '@temporalio/common/lib/time';
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
import {
Expand All @@ -32,7 +23,7 @@ import {
encodeUserMetadata,
} from '@temporalio/common/lib/internal-non-workflow';
import { temporal } from '@temporalio/proto';
import type { Replace } from '@temporalio/common/lib/type-helpers';
import { filterNullAndUndefined } from '@temporalio/common/lib/internal-workflow';
import type {
ActivityCancelInput,
ActivityClientInterceptor,
Expand All @@ -45,13 +36,7 @@ import type {
} from './interceptors';
import type { AsyncCompletionClientOptions } from './async-completion-client';
import { AsyncCompletionClient } from './async-completion-client';
import type {
ActivityExecutionDescription,
ActivityExecutionInfo,
ActivityIdConflictPolicy,
ActivityIdReusePolicy,
CountActivityExecutions,
} from './types';
import type { ActivityExecutionDescription, ActivityExecutionInfo, CountActivityExecutions } from './types';
import {
decodeActivityExecutionStatus,
decodePendingActivityState,
Expand All @@ -67,6 +52,8 @@ import {
ActivityExecutionFailedError,
ActivityExecutionAlreadyStartedError,
} from './errors';
import { type InternalActivityStartOptions, InternalNexusStartOptionsSymbol } from './internal';
import type { ActivityOptions, ActivityOptionsFor, ActivityResult, ActivityName } from './activity-options';

/**
* Options used to configure {@link ActivityClient}
Expand Down Expand Up @@ -256,10 +243,17 @@ export class ActivityClient extends AsyncCompletionClient implements TypedActivi
}
validateActivityOptions(input.options);

const internalOptions = (input.options as InternalActivityStartOptions)[InternalNexusStartOptionsSymbol];

try {
const resp = await this.workflowService.startActivityExecution(
await this.buildStartActivityExecutionRequest(input)
);

if (internalOptions != null) {
internalOptions.backLink = resp.link ?? undefined;
}

return this.createHandle(input.options.id, resp.runId);
} catch (err) {
if (isGrpcServiceError(err) && err.code === grpcStatus.ALREADY_EXISTS) {
Expand Down Expand Up @@ -290,6 +284,8 @@ export class ActivityClient extends AsyncCompletionClient implements TypedActivi
? { indexedFields: encodeUnifiedSearchAttributes(undefined, input.options.typedSearchAttributes) }
: undefined;

const internalOptions = (input.options as InternalActivityStartOptions)[InternalNexusStartOptionsSymbol];

return {
namespace: this.options.namespace,
identity: this.options.identity,
Expand All @@ -310,6 +306,7 @@ export class ActivityClient extends AsyncCompletionClient implements TypedActivi
userMetadata: await encodeUserMetadata(this.dataConverter, input.options.summary, undefined),
priority: input.options.priority ? compilePriority(input.options.priority) : undefined,
startDelay: msOptionalToTs(input.options.startDelay),
...filterNullAndUndefined(internalOptions ?? {}),
};
}

Expand Down Expand Up @@ -362,7 +359,7 @@ export class ActivityClient extends AsyncCompletionClient implements TypedActivi
activityId: input.activityId,
runId: input.activityRunId || undefined,
});
return buildActivityDescription(resp.info!, this.dataConverter);
return buildActivityDescription(resp.info!, resp.callbacks, this.dataConverter);
} catch (err) {
this.rethrowGrpcError(err, 'Failed to describe activity');
}
Expand Down Expand Up @@ -495,77 +492,6 @@ export interface ActivityHandle<R = any> {
terminate(reason: string): Promise<void>;
}

/**
* Options used by {@link ActivityClient.start}.
*
* @experimental Standalone Activities are experimental. APIs may be subject to change.
*/
export interface ActivityOptions {
/**
* Activity ID of the started activity. It's recommended to use a meaningful business ID.
*/
id: string;
/**
* Task queue to run this activity on.
*/
taskQueue: string;
/**
* Input arguments to pass to the activity.
*/
args?: any[] | Readonly<any[]>;
/**
* If set, specifies maximum time between successful heartbeats.
*/
heartbeatTimeout?: Duration;
/**
* Controls how Activity is retried. If not set, the server will assign default retry policy.
*/
retry?: RetryPolicy;
/**
* Is set, specifies total time the activity is allowed to run, including retries.
*
* Note: it is required to set at least one of {@link startToCloseTimeout} and {@link scheduleToCloseTimeout}.
*/
startToCloseTimeout?: Duration;
/**
* If set, specifies maximum time the activity can wait in the task queue before being picked up by a worker.
* This timeout is non-retryable.
*/
scheduleToStartTimeout?: Duration;
/**
* If set, specifies maximum time for a single execution attempt. This timeout is retryable.
*
* Note: it is required to set at least one of {@link startToCloseTimeout} and {@link scheduleToCloseTimeout}.
*/
scheduleToCloseTimeout?: Duration;
/**
* A single-line fixed summary for this activity execution that may appear in UI/CLI.
* This can be in single-line Temporal markdown format.
*/
summary?: string;
/**
* Priority to use when starting this activity.
*/
priority?: Priority;
/**
* Time to wait before dispatching the first activity task. This delay is not applied to retry attempts.
*/
startDelay?: Duration;
/**
* Specifies behavior if there's a *closed* activity with the same ID.
*/
idReusePolicy?: ActivityIdReusePolicy;
/**
* Specifies behavior if there's a *running* activity with the same ID. Note that there can only be one running
* Activity for each Activity ID.
*/
idConflictPolicy?: ActivityIdConflictPolicy;
/**
* Search attributes for the activity.
*/
typedSearchAttributes?: SearchAttributePair[] | TypedSearchAttributes;
}

function validateActivityOptions(options: ActivityOptions): void {
if (!options.id) {
throw new TypeError('id is required');
Expand Down Expand Up @@ -606,6 +532,7 @@ function buildActivityExecutionInfo(info: temporal.api.activity.v1.IActivityExec

function buildActivityDescription(
info: temporal.api.activity.v1.IActivityExecutionInfo,
callbacks: temporal.api.activity.v1.ICallbackInfo[],
dataConverter: LoadedDataConverter
): ActivityExecutionDescription {
const getHeartbeatDetails: <T>() => Promise<T | undefined> = async <T>() => {
Expand All @@ -624,6 +551,7 @@ function buildActivityDescription(
return {
...buildActivityExecutionInfoCommonPart(info),
rawInfo: info,
rawCallbacks: callbacks,
runState: decodePendingActivityState(info.runState),
scheduleToCloseTimeoutMs: optionalTsToMs(info.scheduleToCloseTimeout),
scheduleToStartTimeoutMs: optionalTsToMs(info.scheduleToStartTimeout),
Expand Down Expand Up @@ -663,71 +591,3 @@ export interface TypedActivityClient<T> {

execute<N extends ActivityName<T>>(activity: N, options: ActivityOptionsFor<T, N>): Promise<ActivityResult<T, N>>;
}

/**
* Utility type to support strong typing in {@link TypedActivityClient}.
* Contains names of activities extracted from the specified activity interface.
* @template T Activity interface
*
* @experimental Standalone Activities are experimental. APIs may be subject to change.
*/
export type ActivityName<T> = {
[N in keyof T & string]: T[N] extends ActivityFunction<any, any> ? N : never;
}[keyof T & string];

/**
* Utility type to support strong typing in {@link TypedActivityClient}.
* Extracts argument types of an activity.
* @template T Activity interface
* @template N Activity name
*
* @experimental Standalone Activities are experimental. APIs may be subject to change.
*/
export type ActivityArgs<T, N extends ActivityName<T>> = T[N] extends ActivityFunction<infer P, any> ? P : never;

/**
* Utility type to support strong typing in {@link TypedActivityClient}.
* Extracts result type of an activity.
* @template T Activity interface
* @template N Activity name
*
* @experimental Standalone Activities are experimental. APIs may be subject to change.
*/
export type ActivityResult<T, N extends ActivityName<T>> = T[N] extends ActivityFunction<any, infer R> ? R : never;

/**
* Utility type to support strong typing in {@link TypedActivityClient}.
* Represents {@link ActivityOptions} with strongly typed arguments.
* @template Args Types of activity arguments as an array type.
*
* @experimental Standalone Activities are experimental. APIs may be subject to change.
*/
export type ActivityOptionsWithArgs<Args extends any[]> = Args extends [any, ...any]
? Replace<
ActivityOptions,
{
/**
* Arguments to pass to the Activity
*/
args: Args | Readonly<Args>;
}
>
: Replace<
ActivityOptions,
{
/**
* Arguments to pass to the Activity
*/
args?: Args | Readonly<Args>;
}
>;

/**
* Utility type to support strong typing in {@link TypedActivityClient}.
* Represents {@link ActivityOptions} with strongly typed arguments matching specified Activity in specified interface.
* @template T Activity interface
* @template N Activity name
*
* @experimental Standalone Activities are experimental. APIs may be subject to change.
*/
export type ActivityOptionsFor<T, N extends ActivityName<T>> = ActivityOptionsWithArgs<ActivityArgs<T, N>>;
Loading
Loading