Skip to content

Commit 0376f94

Browse files
authored
Add tags support for activities, sub-orchestrations, and client APIs (#89)
1 parent 1441207 commit 0376f94

12 files changed

Lines changed: 232 additions & 13 deletions

File tree

packages/durabletask-js/src/client/client.ts

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import { FailureDetails } from "../task/failure-details";
2424
import { HistoryEvent } from "../orchestration/history-event";
2525
import { convertProtoHistoryEvent } from "../utils/history-event-converter";
2626
import { Logger, ConsoleLogger } from "../types/logger.type";
27+
import { StartOrchestrationOptions } from "../task/options";
28+
import { mapToRecord } from "../utils/tags.util";
29+
import { populateTagsMap } from "../utils/pb-helper.util";
2730

2831
// Re-export MetadataGenerator for backward compatibility
2932
export { MetadataGenerator } from "../utils/grpc-helper.util";
@@ -135,13 +138,46 @@ export class TaskHubGrpcClient {
135138
input?: TInput,
136139
instanceId?: string,
137140
startAt?: Date,
141+
): Promise<string>;
142+
/**
143+
* Schedules a new orchestrator using the DurableTask client.
144+
*
145+
* @param {TOrchestrator | string} orchestrator - The orchestrator or the name of the orchestrator to be scheduled.
146+
* @param {TInput} input - Optional input for the orchestrator.
147+
* @param {StartOrchestrationOptions} options - Options for instance ID, start time, and tags.
148+
* @return {Promise<string>} A Promise resolving to the unique ID of the scheduled orchestrator instance.
149+
*/
150+
async scheduleNewOrchestration(
151+
orchestrator: TOrchestrator | string,
152+
input?: TInput,
153+
options?: StartOrchestrationOptions,
154+
): Promise<string>;
155+
async scheduleNewOrchestration(
156+
orchestrator: TOrchestrator | string,
157+
input?: TInput,
158+
instanceIdOrOptions?: string | StartOrchestrationOptions,
159+
startAt?: Date,
138160
): Promise<string> {
139161
let name;
140162
if (typeof orchestrator === "string") {
141163
name = orchestrator;
142164
} else {
143165
name = getName(orchestrator);
144166
}
167+
168+
const instanceId =
169+
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
170+
? instanceIdOrOptions
171+
: instanceIdOrOptions.instanceId;
172+
const scheduledStartAt =
173+
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
174+
? startAt
175+
: instanceIdOrOptions.startAt;
176+
const tags =
177+
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
178+
? undefined
179+
: instanceIdOrOptions.tags;
180+
145181
const req = new pb.CreateInstanceRequest();
146182
req.setName(name);
147183
req.setInstanceid(instanceId ?? randomUUID());
@@ -150,11 +186,13 @@ export class TaskHubGrpcClient {
150186
i.setValue(JSON.stringify(input));
151187

152188
const ts = new Timestamp();
153-
ts.fromDate(new Date(startAt?.getTime() ?? 0));
189+
ts.fromDate(new Date(scheduledStartAt?.getTime() ?? 0));
154190

155191
req.setInput(i);
156192
req.setScheduledstarttimestamp(ts);
157193

194+
populateTagsMap(req.getTagsMap(), tags);
195+
158196
this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}`);
159197

160198
const res = await callWithMetadata<pb.CreateInstanceRequest, pb.CreateInstanceResponse>(
@@ -569,14 +607,15 @@ export class TaskHubGrpcClient {
569607
* @example
570608
* ```typescript
571609
* // Iterate over all matching instances
610+
* const logger = new ConsoleLogger();
572611
* const pageable = client.getAllInstances({ statuses: [OrchestrationStatus.COMPLETED] });
573612
* for await (const instance of pageable) {
574-
* console.log(instance.instanceId);
613+
* logger.info(instance.instanceId);
575614
* }
576615
*
577616
* // Iterate over pages
578617
* for await (const page of pageable.asPages()) {
579-
* console.log(`Page has ${page.values.length} items`);
618+
* logger.info(`Page has ${page.values.length} items`);
580619
* }
581620
* ```
582621
*
@@ -852,6 +891,8 @@ export class TaskHubGrpcClient {
852891
);
853892
}
854893

894+
const tags = mapToRecord(protoState.getTagsMap());
895+
855896
return new OrchestrationState(
856897
instanceId,
857898
name ?? "",
@@ -862,6 +903,7 @@ export class TaskHubGrpcClient {
862903
serializedOutput,
863904
serializedCustomStatus,
864905
failureDetails,
906+
tags,
865907
);
866908
}
867909
}

packages/durabletask-js/src/index.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,13 @@ export { Task } from "./task/task";
6565

6666
// Retry policies and task options
6767
export { RetryPolicy, RetryPolicyOptions } from "./task/retry";
68-
export { TaskOptions, SubOrchestrationOptions, taskOptionsFromRetryPolicy, subOrchestrationOptionsFromRetryPolicy } from "./task/options";
68+
export {
69+
TaskOptions,
70+
SubOrchestrationOptions,
71+
StartOrchestrationOptions,
72+
taskOptionsFromRetryPolicy,
73+
subOrchestrationOptionsFromRetryPolicy,
74+
} from "./task/options";
6975

7076
// Types
7177
export { TOrchestrator } from "./types/orchestrator.type";

packages/durabletask-js/src/orchestration/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import * as pb from "../proto/orchestrator_service_pb";
55
import { FailureDetails } from "../task/failure-details";
66
import { fromProtobuf } from "./enum/orchestration-status.enum";
77
import { OrchestrationState } from "./orchestration-state";
8+
import { mapToRecord } from "../utils/tags.util";
89

910
export function newOrchestrationState(
1011
instanceId: string,
@@ -43,6 +44,8 @@ export function newOrchestrationState(
4344
tsUpdatedParsed = new Date(tsUpdated.getSeconds() * 1000 + tsUpdated.getNanos() / 1000000);
4445
}
4546

47+
const tags = mapToRecord(state?.getTagsMap());
48+
4649
return new OrchestrationState(
4750
instanceId,
4851
state?.getName() ?? "",
@@ -53,5 +56,6 @@ export function newOrchestrationState(
5356
state?.getOutput()?.toString(),
5457
state?.getCustomstatus()?.toString(),
5558
failureDetails,
59+
tags,
5660
);
5761
}

packages/durabletask-js/src/orchestration/orchestration-state.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export class OrchestrationState {
1515
serializedOutput?: string;
1616
serializedCustomStatus?: string;
1717
failureDetails?: FailureDetails;
18+
tags?: Record<string, string>;
1819

1920
constructor(
2021
instanceId: string,
@@ -26,6 +27,7 @@ export class OrchestrationState {
2627
serializedOutput?: string,
2728
serializedCustomStatus?: string,
2829
failureDetails?: FailureDetails,
30+
tags?: Record<string, string>,
2931
) {
3032
this.instanceId = instanceId;
3133
this.name = name;
@@ -36,6 +38,7 @@ export class OrchestrationState {
3638
this.serializedOutput = serializedOutput;
3739
this.serializedCustomStatus = serializedCustomStatus;
3840
this.failureDetails = failureDetails;
41+
this.tags = tags;
3942
}
4043

4144
raiseIfFailed(): void {

packages/durabletask-js/src/task/options/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
export {
55
TaskOptions,
66
SubOrchestrationOptions,
7+
StartOrchestrationOptions,
78
taskOptionsFromRetryPolicy,
89
subOrchestrationOptionsFromRetryPolicy,
910
} from "./task-options";

packages/durabletask-js/src/task/options/task-options.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ export interface TaskOptions {
1212
* Controls how many times a task is retried and the delay between retries.
1313
*/
1414
retry?: RetryPolicy;
15+
/**
16+
* The tags to associate with the task.
17+
*/
18+
tags?: Record<string, string>;
1519
}
1620

1721
/**
@@ -26,6 +30,26 @@ export interface SubOrchestrationOptions extends TaskOptions {
2630
instanceId?: string;
2731
}
2832

33+
/**
34+
* Options for scheduling new orchestrations via the client.
35+
*/
36+
export interface StartOrchestrationOptions {
37+
/**
38+
* The unique ID to use for the orchestration instance.
39+
* If not specified, a random UUID will be generated.
40+
*/
41+
instanceId?: string;
42+
/**
43+
* The time when the orchestration should start executing.
44+
* If not specified or in the past, it will start immediately.
45+
*/
46+
startAt?: Date;
47+
/**
48+
* The tags to associate with the orchestration instance.
49+
*/
50+
tags?: Record<string, string>;
51+
}
52+
2953
/**
3054
* Creates a TaskOptions instance from a RetryPolicy.
3155
*

packages/durabletask-js/src/utils/pb-helper.util.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,30 @@ export function getStringValue(val?: string): StringValue | undefined {
243243
return stringValue;
244244
}
245245

246+
/**
247+
* Populates a tag map with the provided tags.
248+
*
249+
* Copies all key-value pairs from the optional {@link tags} object into the given
250+
* {@link tagsMap} by invoking its `set` method for each entry. If no tags are
251+
* provided, this function is a no-op.
252+
*
253+
* @param tagsMap - A map-like object that exposes a `set(key, value)` method used
254+
* to store tag key-value pairs.
255+
* @param tags - An optional record of tag key-value pairs to add to the map.
256+
*/
257+
export function populateTagsMap(
258+
tagsMap: { set: (key: string, value: string) => void },
259+
tags?: Record<string, string>,
260+
): void {
261+
if (!tags) {
262+
return;
263+
}
264+
265+
for (const [key, value] of Object.entries(tags)) {
266+
tagsMap.set(key, value);
267+
}
268+
}
269+
246270
export function newCompleteOrchestrationAction(
247271
id: number,
248272
status: pb.OrchestrationStatus,
@@ -277,10 +301,16 @@ export function newCreateTimerAction(id: number, fireAt: Date): pb.OrchestratorA
277301
return action;
278302
}
279303

280-
export function newScheduleTaskAction(id: number, name: string, encodedInput?: string): pb.OrchestratorAction {
304+
export function newScheduleTaskAction(
305+
id: number,
306+
name: string,
307+
encodedInput?: string,
308+
tags?: Record<string, string>,
309+
): pb.OrchestratorAction {
281310
const scheduleTaskAction = new pb.ScheduleTaskAction();
282311
scheduleTaskAction.setName(name);
283312
scheduleTaskAction.setInput(getStringValue(encodedInput));
313+
populateTagsMap(scheduleTaskAction.getTagsMap(), tags);
284314

285315
const action = new pb.OrchestratorAction();
286316
action.setId(id);
@@ -300,11 +330,13 @@ export function newCreateSubOrchestrationAction(
300330
name: string,
301331
instanceId?: string | null,
302332
encodedInput?: string,
333+
tags?: Record<string, string>,
303334
): pb.OrchestratorAction {
304335
const createSubOrchestrationAction = new pb.CreateSubOrchestrationAction();
305336
createSubOrchestrationAction.setName(name);
306337
createSubOrchestrationAction.setInstanceid(instanceId || "");
307338
createSubOrchestrationAction.setInput(getStringValue(encodedInput));
339+
populateTagsMap(createSubOrchestrationAction.getTagsMap(), tags);
308340

309341
const action = new pb.OrchestratorAction();
310342
action.setId(id);
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
type TagsMapLike = {
5+
forEach: (callback: (value: string, key: string) => void) => void;
6+
};
7+
8+
/**
9+
* Converts a map-like collection of tag key/value pairs into a plain record object.
10+
*
11+
* @param tagsMap - A map-like object containing tag values keyed by tag name.
12+
* @returns A record containing the tag key/value pairs, or `undefined` if the input
13+
* is `undefined` or if no tags are present.
14+
*/
15+
export function mapToRecord(tagsMap?: TagsMapLike): Record<string, string> | undefined {
16+
if (!tagsMap) {
17+
return;
18+
}
19+
20+
const tags: Record<string, string> = {};
21+
tagsMap.forEach((value, key) => {
22+
tags[key] = value;
23+
});
24+
25+
return Object.keys(tags).length > 0 ? tags : undefined;
26+
}

packages/durabletask-js/src/worker/runtime-orchestration-context.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { TActivity } from "../types/activity.type";
1414
import { TOrchestrator } from "../types/orchestrator.type";
1515
import { Task } from "../task/task";
1616
import { StopIterationError } from "./exception/stop-iteration-error";
17+
import { mapToRecord } from "../utils/tags.util";
1718

1819
export class RuntimeOrchestrationContext extends OrchestrationContext {
1920
_generator?: Generator<Task<any>, any, any>;
@@ -262,7 +263,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
262263
const id = this.nextSequenceNumber();
263264
const name = typeof activity === "string" ? activity : getName(activity);
264265
const encodedInput = input ? JSON.stringify(input) : undefined;
265-
const action = ph.newScheduleTaskAction(id, name, encodedInput);
266+
const action = ph.newScheduleTaskAction(id, name, encodedInput, options?.tags);
266267
this._pendingActions[action.getId()] = action;
267268

268269
// If a retry policy is provided, create a RetryableTask
@@ -306,7 +307,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
306307
}
307308

308309
const encodedInput = input ? JSON.stringify(input) : undefined;
309-
const action = ph.newCreateSubOrchestrationAction(id, name, instanceId, encodedInput);
310+
const action = ph.newCreateSubOrchestrationAction(id, name, instanceId, encodedInput, options?.tags);
310311
this._pendingActions[action.getId()] = action;
311312

312313
// If a retry policy is provided, create a RetryableTask
@@ -520,7 +521,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
520521
}
521522
const name = scheduleTask.getName();
522523
const input = scheduleTask.getInput()?.getValue();
523-
newAction = ph.newScheduleTaskAction(newId, name, input);
524+
const tags = mapToRecord(scheduleTask.getTagsMap());
525+
newAction = ph.newScheduleTaskAction(newId, name, input, tags);
524526
} else {
525527
// Reschedule a sub-orchestration task
526528
const subOrch = originalAction.getCreatesuborchestration();
@@ -530,7 +532,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
530532
const name = subOrch.getName();
531533
const instanceId = subOrch.getInstanceid();
532534
const input = subOrch.getInput()?.getValue();
533-
newAction = ph.newCreateSubOrchestrationAction(newId, name, instanceId, input);
535+
const tags = mapToRecord(subOrch.getTagsMap());
536+
newAction = ph.newCreateSubOrchestrationAction(newId, name, instanceId, input, tags);
534537
}
535538

536539
// Register the new action

0 commit comments

Comments
 (0)