Skip to content

Commit 1655fae

Browse files
committed
Add support for orchestration tags in TaskHubGrpcClient and related components
1 parent 032359a commit 1655fae

File tree

12 files changed

+231
-7
lines changed

12 files changed

+231
-7
lines changed

packages/durabletask-js/src/client/client.ts

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,26 @@ import { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from ".
2222
import { Page, AsyncPageable, createAsyncPageable } from "../orchestration/page";
2323
import { FailureDetails } from "../task/failure-details";
2424
import { Logger, ConsoleLogger } from "../types/logger.type";
25+
import { StartOrchestrationOptions } from "../task/options";
2526

2627
// Re-export MetadataGenerator for backward compatibility
2728
export { MetadataGenerator } from "../utils/grpc-helper.util";
2829

30+
function mapToRecord(tagsMap?: { forEach: (cb: (value: string, key: string) => void) => void }):
31+
| Record<string, string>
32+
| undefined {
33+
if (!tagsMap) {
34+
return;
35+
}
36+
37+
const tags: Record<string, string> = {};
38+
tagsMap.forEach((value, key) => {
39+
tags[key] = value;
40+
});
41+
42+
return Object.keys(tags).length > 0 ? tags : undefined;
43+
}
44+
2945
/**
3046
* Options for creating a TaskHubGrpcClient.
3147
*/
@@ -133,13 +149,46 @@ export class TaskHubGrpcClient {
133149
input?: TInput,
134150
instanceId?: string,
135151
startAt?: Date,
152+
): Promise<string>;
153+
/**
154+
* Schedules a new orchestrator using the DurableTask client.
155+
*
156+
* @param {TOrchestrator | string} orchestrator - The orchestrator or the name of the orchestrator to be scheduled.
157+
* @param {TInput} input - Optional input for the orchestrator.
158+
* @param {StartOrchestrationOptions} options - Options for instance ID, start time, and tags.
159+
* @return {Promise<string>} A Promise resolving to the unique ID of the scheduled orchestrator instance.
160+
*/
161+
async scheduleNewOrchestration(
162+
orchestrator: TOrchestrator | string,
163+
input?: TInput,
164+
options?: StartOrchestrationOptions,
165+
): Promise<string>;
166+
async scheduleNewOrchestration(
167+
orchestrator: TOrchestrator | string,
168+
input?: TInput,
169+
instanceIdOrOptions?: string | StartOrchestrationOptions,
170+
startAt?: Date,
136171
): Promise<string> {
137172
let name;
138173
if (typeof orchestrator === "string") {
139174
name = orchestrator;
140175
} else {
141176
name = getName(orchestrator);
142177
}
178+
179+
const instanceId =
180+
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
181+
? instanceIdOrOptions
182+
: instanceIdOrOptions.instanceId;
183+
const scheduledStartAt =
184+
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
185+
? startAt
186+
: instanceIdOrOptions.startAt;
187+
const tags =
188+
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
189+
? undefined
190+
: instanceIdOrOptions.tags;
191+
143192
const req = new pb.CreateInstanceRequest();
144193
req.setName(name);
145194
req.setInstanceid(instanceId ?? randomUUID());
@@ -148,11 +197,18 @@ export class TaskHubGrpcClient {
148197
i.setValue(JSON.stringify(input));
149198

150199
const ts = new Timestamp();
151-
ts.fromDate(new Date(startAt?.getTime() ?? 0));
200+
ts.fromDate(new Date(scheduledStartAt?.getTime() ?? 0));
152201

153202
req.setInput(i);
154203
req.setScheduledstarttimestamp(ts);
155204

205+
if (tags) {
206+
const tagsMap = req.getTagsMap();
207+
for (const [key, value] of Object.entries(tags)) {
208+
tagsMap.set(key, value);
209+
}
210+
}
211+
156212
this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}`);
157213

158214
const res = await callWithMetadata<pb.CreateInstanceRequest, pb.CreateInstanceResponse>(
@@ -779,6 +835,8 @@ export class TaskHubGrpcClient {
779835
);
780836
}
781837

838+
const tags = mapToRecord(protoState.getTagsMap());
839+
782840
return new OrchestrationState(
783841
instanceId,
784842
name ?? "",
@@ -789,6 +847,7 @@ export class TaskHubGrpcClient {
789847
serializedOutput,
790848
serializedCustomStatus,
791849
failureDetails,
850+
tags,
792851
);
793852
}
794853
}

packages/durabletask-js/src/index.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,13 @@ export { Task } from "./task/task";
2727

2828
// Retry policies and task options
2929
export { RetryPolicy, RetryPolicyOptions } from "./task/retry";
30-
export { TaskOptions, SubOrchestrationOptions, taskOptionsFromRetryPolicy, subOrchestrationOptionsFromRetryPolicy } from "./task/options";
30+
export {
31+
TaskOptions,
32+
SubOrchestrationOptions,
33+
StartOrchestrationOptions,
34+
taskOptionsFromRetryPolicy,
35+
subOrchestrationOptionsFromRetryPolicy,
36+
} from "./task/options";
3137

3238
// Types
3339
export { TOrchestrator } from "./types/orchestrator.type";

packages/durabletask-js/src/orchestration/index.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,21 @@ import { FailureDetails } from "../task/failure-details";
66
import { fromProtobuf } from "./enum/orchestration-status.enum";
77
import { OrchestrationState } from "./orchestration-state";
88

9+
function mapToRecord(tagsMap?: { forEach: (cb: (value: string, key: string) => void) => void }):
10+
| Record<string, string>
11+
| undefined {
12+
if (!tagsMap) {
13+
return;
14+
}
15+
16+
const tags: Record<string, string> = {};
17+
tagsMap.forEach((value, key) => {
18+
tags[key] = value;
19+
});
20+
21+
return Object.keys(tags).length > 0 ? tags : undefined;
22+
}
23+
924
export function newOrchestrationState(
1025
instanceId: string,
1126
res?: pb.GetInstanceResponse,
@@ -43,6 +58,8 @@ export function newOrchestrationState(
4358
tsUpdatedParsed = new Date(tsUpdated.getSeconds() * 1000 + tsUpdated.getNanos() / 1000000);
4459
}
4560

61+
const tags = mapToRecord(state?.getTagsMap());
62+
4663
return new OrchestrationState(
4764
instanceId,
4865
state?.getName() ?? "",
@@ -53,5 +70,6 @@ export function newOrchestrationState(
5370
state?.getOutput()?.toString(),
5471
state?.getCustomstatus()?.toString(),
5572
failureDetails,
73+
tags,
5674
);
5775
}

packages/durabletask-js/src/orchestration/orchestration-state.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export class OrchestrationState {
1515
serializedOutput?: string;
1616
serializedCustomStatus?: string;
1717
failureDetails?: FailureDetails;
18+
tags?: Record<string, string>;
1819

1920
constructor(
2021
instanceId: string,
@@ -26,6 +27,7 @@ export class OrchestrationState {
2627
serializedOutput?: string,
2728
serializedCustomStatus?: string,
2829
failureDetails?: FailureDetails,
30+
tags?: Record<string, string>,
2931
) {
3032
this.instanceId = instanceId;
3133
this.name = name;
@@ -36,6 +38,7 @@ export class OrchestrationState {
3638
this.serializedOutput = serializedOutput;
3739
this.serializedCustomStatus = serializedCustomStatus;
3840
this.failureDetails = failureDetails;
41+
this.tags = tags;
3942
}
4043

4144
raiseIfFailed(): void {

packages/durabletask-js/src/task/options/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
export {
55
TaskOptions,
66
SubOrchestrationOptions,
7+
StartOrchestrationOptions,
78
taskOptionsFromRetryPolicy,
89
subOrchestrationOptionsFromRetryPolicy,
910
} from "./task-options";

packages/durabletask-js/src/task/options/task-options.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ export interface TaskOptions {
1212
* Controls how many times a task is retried and the delay between retries.
1313
*/
1414
retry?: RetryPolicy;
15+
/**
16+
* The tags to associate with the task.
17+
*/
18+
tags?: Record<string, string>;
1519
}
1620

1721
/**
@@ -26,6 +30,26 @@ export interface SubOrchestrationOptions extends TaskOptions {
2630
instanceId?: string;
2731
}
2832

33+
/**
34+
* Options for scheduling new orchestrations via the client.
35+
*/
36+
export interface StartOrchestrationOptions {
37+
/**
38+
* The unique ID to use for the orchestration instance.
39+
* If not specified, a random UUID will be generated.
40+
*/
41+
instanceId?: string;
42+
/**
43+
* The time when the orchestration should start executing.
44+
* If not specified or in the past, it will start immediately.
45+
*/
46+
startAt?: Date;
47+
/**
48+
* The tags to associate with the orchestration instance.
49+
*/
50+
tags?: Record<string, string>;
51+
}
52+
2953
/**
3054
* Creates a TaskOptions instance from a RetryPolicy.
3155
*

packages/durabletask-js/src/utils/pb-helper.util.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,19 @@ export function getStringValue(val?: string): StringValue | undefined {
243243
return stringValue;
244244
}
245245

246+
function setTagsMap(
247+
tagsMap: { set: (key: string, value: string) => void },
248+
tags?: Record<string, string>,
249+
): void {
250+
if (!tags) {
251+
return;
252+
}
253+
254+
for (const [key, value] of Object.entries(tags)) {
255+
tagsMap.set(key, value);
256+
}
257+
}
258+
246259
export function newCompleteOrchestrationAction(
247260
id: number,
248261
status: pb.OrchestrationStatus,
@@ -277,10 +290,16 @@ export function newCreateTimerAction(id: number, fireAt: Date): pb.OrchestratorA
277290
return action;
278291
}
279292

280-
export function newScheduleTaskAction(id: number, name: string, encodedInput?: string): pb.OrchestratorAction {
293+
export function newScheduleTaskAction(
294+
id: number,
295+
name: string,
296+
encodedInput?: string,
297+
tags?: Record<string, string>,
298+
): pb.OrchestratorAction {
281299
const scheduleTaskAction = new pb.ScheduleTaskAction();
282300
scheduleTaskAction.setName(name);
283301
scheduleTaskAction.setInput(getStringValue(encodedInput));
302+
setTagsMap(scheduleTaskAction.getTagsMap(), tags);
284303

285304
const action = new pb.OrchestratorAction();
286305
action.setId(id);
@@ -300,11 +319,13 @@ export function newCreateSubOrchestrationAction(
300319
name: string,
301320
instanceId?: string | null,
302321
encodedInput?: string,
322+
tags?: Record<string, string>,
303323
): pb.OrchestratorAction {
304324
const createSubOrchestrationAction = new pb.CreateSubOrchestrationAction();
305325
createSubOrchestrationAction.setName(name);
306326
createSubOrchestrationAction.setInstanceid(instanceId || "");
307327
createSubOrchestrationAction.setInput(getStringValue(encodedInput));
328+
setTagsMap(createSubOrchestrationAction.getTagsMap(), tags);
308329

309330
const action = new pb.OrchestratorAction();
310331
action.setId(id);

packages/durabletask-js/src/worker/runtime-orchestration-context.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,21 @@ import { TOrchestrator } from "../types/orchestrator.type";
1515
import { Task } from "../task/task";
1616
import { StopIterationError } from "./exception/stop-iteration-error";
1717

18+
function mapToRecord(tagsMap?: { forEach: (cb: (value: string, key: string) => void) => void }):
19+
| Record<string, string>
20+
| undefined {
21+
if (!tagsMap) {
22+
return;
23+
}
24+
25+
const tags: Record<string, string> = {};
26+
tagsMap.forEach((value, key) => {
27+
tags[key] = value;
28+
});
29+
30+
return Object.keys(tags).length > 0 ? tags : undefined;
31+
}
32+
1833
export class RuntimeOrchestrationContext extends OrchestrationContext {
1934
_generator?: Generator<Task<any>, any, any>;
2035
_previousTask?: Task<any>;
@@ -262,7 +277,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
262277
const id = this.nextSequenceNumber();
263278
const name = typeof activity === "string" ? activity : getName(activity);
264279
const encodedInput = input ? JSON.stringify(input) : undefined;
265-
const action = ph.newScheduleTaskAction(id, name, encodedInput);
280+
const action = ph.newScheduleTaskAction(id, name, encodedInput, options?.tags);
266281
this._pendingActions[action.getId()] = action;
267282

268283
// If a retry policy is provided, create a RetryableTask
@@ -306,7 +321,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
306321
}
307322

308323
const encodedInput = input ? JSON.stringify(input) : undefined;
309-
const action = ph.newCreateSubOrchestrationAction(id, name, instanceId, encodedInput);
324+
const action = ph.newCreateSubOrchestrationAction(id, name, instanceId, encodedInput, options?.tags);
310325
this._pendingActions[action.getId()] = action;
311326

312327
// If a retry policy is provided, create a RetryableTask
@@ -520,7 +535,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
520535
}
521536
const name = scheduleTask.getName();
522537
const input = scheduleTask.getInput()?.getValue();
523-
newAction = ph.newScheduleTaskAction(newId, name, input);
538+
const tags = mapToRecord(scheduleTask.getTagsMap());
539+
newAction = ph.newScheduleTaskAction(newId, name, input, tags);
524540
} else {
525541
// Reschedule a sub-orchestration task
526542
const subOrch = originalAction.getCreatesuborchestration();
@@ -530,7 +546,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
530546
const name = subOrch.getName();
531547
const instanceId = subOrch.getInstanceid();
532548
const input = subOrch.getInput()?.getValue();
533-
newAction = ph.newCreateSubOrchestrationAction(newId, name, instanceId, input);
549+
const tags = mapToRecord(subOrch.getTagsMap());
550+
newAction = ph.newCreateSubOrchestrationAction(newId, name, instanceId, input, tags);
534551
}
535552

536553
// Register the new action

packages/durabletask-js/test/orchestration_executor.spec.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,25 @@ describe("Orchestration Executor", () => {
149149
expect(result.actions[0]?.getId()).toEqual(1);
150150
expect(result.actions[0]?.getScheduletask()?.getName()).toEqual("dummyActivity");
151151
});
152+
153+
it("should include tags on scheduled activity actions", async () => {
154+
const dummyActivity = async (_: ActivityContext) => {
155+
// do nothing
156+
};
157+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any) {
158+
yield ctx.callActivity(dummyActivity, undefined, { tags: { env: "test", owner: "durable" } });
159+
return "done";
160+
};
161+
const registry = new Registry();
162+
const name = registry.addOrchestrator(orchestrator);
163+
const newEvents = [newExecutionStartedEvent(name, TEST_INSTANCE_ID, undefined)];
164+
const executor = new OrchestrationExecutor(registry, testLogger);
165+
const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents);
166+
const scheduleTask = result.actions[0]?.getScheduletask();
167+
168+
expect(scheduleTask?.getTagsMap().get("env")).toEqual("test");
169+
expect(scheduleTask?.getTagsMap().get("owner")).toEqual("durable");
170+
});
152171
it("should test the successful completion of an activity task", async () => {
153172
const dummyActivity = async (_: ActivityContext) => {
154173
// do nothing
@@ -348,6 +367,26 @@ describe("Orchestration Executor", () => {
348367
expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
349368
expect(completeAction?.getResult()?.getValue()).toEqual("42");
350369
});
370+
371+
it("should include tags on scheduled sub-orchestration actions", async () => {
372+
const subOrchestrator = async (_: OrchestrationContext) => {
373+
// do nothing
374+
};
375+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any {
376+
yield ctx.callSubOrchestrator(subOrchestrator, undefined, { tags: { env: "test" } });
377+
return "done";
378+
};
379+
const registry = new Registry();
380+
const subOrchestratorName = registry.addOrchestrator(subOrchestrator);
381+
const orchestratorName = registry.addOrchestrator(orchestrator);
382+
const newEvents = [newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID, undefined)];
383+
const executor = new OrchestrationExecutor(registry, testLogger);
384+
const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents);
385+
const createSubOrch = result.actions[0]?.getCreatesuborchestration();
386+
387+
expect(createSubOrch?.getName()).toEqual(subOrchestratorName);
388+
expect(createSubOrch?.getTagsMap().get("env")).toEqual("test");
389+
});
351390
it("should test that a sub-orchestration task is completed when the sub-orchestration fails", async () => {
352391
const subOrchestrator = async (_: OrchestrationContext) => {
353392
// do nothing

0 commit comments

Comments
 (0)