Skip to content

Commit 9d5bbc3

Browse files
authored
feat: Implement work item filters (#168)
1 parent 069700f commit 9d5bbc3

15 files changed

Lines changed: 2359 additions & 24 deletions

File tree

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* This example demonstrates Work Item Filters for Durable Task workers.
6+
*
7+
* Work Item Filters allow a worker to tell the sidecar which orchestrations,
8+
* activities, and entities it is configured to handle. The sidecar then only
9+
* dispatches matching work items to that worker, enabling efficient routing.
10+
*
11+
* Key concepts demonstrated:
12+
* - Auto-generated filters from the worker's registry (default behavior)
13+
* - Explicit filters via useWorkItemFilters()
14+
*
15+
* This example runs against:
16+
* DTS Emulator:
17+
* docker run --name dts-emulator -i -p 8080:8080 -d --rm mcr.microsoft.com/dts/dts-emulator:latest
18+
* Then:
19+
* npx ts-node --swc examples/work-item-filters/index.ts
20+
*/
21+
22+
import {
23+
ActivityContext,
24+
OrchestrationContext,
25+
TOrchestrator,
26+
WorkItemFilters,
27+
} from "@microsoft/durabletask-js";
28+
import {
29+
DurableTaskAzureManagedClientBuilder,
30+
DurableTaskAzureManagedWorkerBuilder,
31+
} from "@microsoft/durabletask-js-azuremanaged";
32+
33+
const endpoint = process.env.ENDPOINT || "localhost:8080";
34+
const taskHub = process.env.TASKHUB || "default";
35+
36+
// ============================================================================
37+
// Step 1: Define activities and orchestrators
38+
// ============================================================================
39+
40+
const greet = async (_: ActivityContext, name: string): Promise<string> => {
41+
return `Hello, ${name}!`;
42+
};
43+
44+
const add = async (_: ActivityContext, input: { a: number; b: number }): Promise<number> => {
45+
return input.a + input.b;
46+
};
47+
48+
const greetingOrchestrator: TOrchestrator = async function* (
49+
ctx: OrchestrationContext,
50+
name: string,
51+
): Promise<any> {
52+
const result = yield ctx.callActivity(greet, name);
53+
return result;
54+
};
55+
56+
const mathOrchestrator: TOrchestrator = async function* (
57+
ctx: OrchestrationContext,
58+
input: { a: number; b: number },
59+
): Promise<any> {
60+
const result = yield ctx.callActivity(add, input);
61+
return result;
62+
};
63+
64+
// ============================================================================
65+
// Step 2: Demonstrate different work item filter configurations
66+
// ============================================================================
67+
68+
async function runWithAutoGeneratedFilters() {
69+
console.log("\n=== Scenario 1: Auto-Generated Filters (Default) ===");
70+
console.log("The worker auto-generates filters from its registered orchestrators and activities.");
71+
console.log("Only matching work items will be dispatched to this worker.\n");
72+
73+
const client = new DurableTaskAzureManagedClientBuilder()
74+
.endpoint(endpoint, taskHub, null)
75+
.build();
76+
77+
// No explicit filters — they are auto-generated from addOrchestrator/addActivity
78+
const worker = new DurableTaskAzureManagedWorkerBuilder()
79+
.endpoint(endpoint, taskHub, null)
80+
.addOrchestrator(greetingOrchestrator)
81+
.addActivity(greet)
82+
.build();
83+
84+
await worker.start();
85+
console.log("Worker started with auto-generated filters for: greetingOrchestrator, greet");
86+
87+
const id = await client.scheduleNewOrchestration(greetingOrchestrator, "Auto-Filters");
88+
console.log(`Scheduled orchestration: ${id}`);
89+
90+
const state = await client.waitForOrchestrationCompletion(id, undefined, 30);
91+
console.log(`Result: ${state?.serializedOutput}`);
92+
93+
await worker.stop();
94+
await client.stop();
95+
}
96+
97+
async function runWithExplicitFilters() {
98+
console.log("\n=== Scenario 2: Explicit Filters ===");
99+
console.log("The worker uses explicitly provided filters instead of auto-generating them.");
100+
console.log("This is useful when you want fine-grained control over which work items to accept.\n");
101+
102+
const client = new DurableTaskAzureManagedClientBuilder()
103+
.endpoint(endpoint, taskHub, null)
104+
.build();
105+
106+
// Provide explicit filters — these override auto-generation
107+
const filters: WorkItemFilters = {
108+
orchestrations: [{ name: "mathOrchestrator" }],
109+
activities: [{ name: "add" }],
110+
};
111+
112+
const worker = new DurableTaskAzureManagedWorkerBuilder()
113+
.endpoint(endpoint, taskHub, null)
114+
.addOrchestrator(mathOrchestrator)
115+
.addActivity(add)
116+
.useWorkItemFilters(filters)
117+
.build();
118+
119+
await worker.start();
120+
console.log("Worker started with explicit filters for: mathOrchestrator, add");
121+
122+
const id = await client.scheduleNewOrchestration(mathOrchestrator, { a: 17, b: 25 });
123+
console.log(`Scheduled orchestration: ${id}`);
124+
125+
const state = await client.waitForOrchestrationCompletion(id, undefined, 30);
126+
console.log(`Result: ${state?.serializedOutput}`);
127+
128+
await worker.stop();
129+
await client.stop();
130+
}
131+
132+
// ============================================================================
133+
// Step 3: Run all scenarios
134+
// ============================================================================
135+
136+
(async () => {
137+
console.log(`Connecting to DTS emulator at ${endpoint}, taskHub: ${taskHub}`);
138+
139+
try {
140+
await runWithAutoGeneratedFilters();
141+
await runWithExplicitFilters();
142+
143+
console.log("\n=== All scenarios completed successfully! ===");
144+
} catch (error) {
145+
console.error("Error:", error);
146+
process.exit(1);
147+
}
148+
149+
process.exit(0);
150+
})();
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"name": "work-item-filters-example",
3+
"version": "1.0.0",
4+
"description": "Example demonstrating work item filters for Durable Task workers",
5+
"private": true,
6+
"scripts": {
7+
"start": "ts-node --swc index.ts",
8+
"start:emulator": "ENDPOINT=localhost:8080 TASKHUB=default ts-node --swc index.ts"
9+
},
10+
"dependencies": {
11+
"@microsoft/durabletask-js": "workspace:*",
12+
"@microsoft/durabletask-js-azuremanaged": "workspace:*"
13+
},
14+
"engines": {
15+
"node": ">=22.0.0"
16+
}
17+
}

internal/protocol/SOURCE_COMMIT

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
026329c53fe6363985655857b9ca848ec7238bd2
1+
1caadbd7ecfdf5f2309acbeac28a3e36d16aa156

internal/protocol/protos/orchestrator_service.proto

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,7 @@ message GetWorkItemsRequest {
822822
int32 maxConcurrentEntityWorkItems = 3;
823823

824824
repeated WorkerCapability capabilities = 10;
825+
WorkItemFilters workItemFilters = 11;
825826
}
826827

827828
enum WorkerCapability {
@@ -844,6 +845,26 @@ enum WorkerCapability {
844845
WORKER_CAPABILITY_LARGE_PAYLOADS = 3;
845846
}
846847

848+
message WorkItemFilters {
849+
repeated OrchestrationFilter orchestrations = 1;
850+
repeated ActivityFilter activities = 2;
851+
repeated EntityFilter entities = 3;
852+
}
853+
854+
message OrchestrationFilter {
855+
string name = 1;
856+
repeated string versions = 2;
857+
}
858+
859+
message ActivityFilter {
860+
string name = 1;
861+
repeated string versions = 2;
862+
}
863+
864+
message EntityFilter {
865+
string name = 1;
866+
}
867+
847868
message WorkItem {
848869
oneof request {
849870
OrchestratorRequest orchestratorRequest = 1;

package-lock.json

Lines changed: 0 additions & 17 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/durabletask-js-azuremanaged/src/worker-builder.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
Logger,
1414
ConsoleLogger,
1515
VersioningOptions,
16+
WorkItemFilters,
1617
} from "@microsoft/durabletask-js";
1718

1819
/**
@@ -27,6 +28,7 @@ export class DurableTaskAzureManagedWorkerBuilder {
2728
private _logger: Logger = new ConsoleLogger();
2829
private _shutdownTimeoutMs?: number;
2930
private _versioning?: VersioningOptions;
31+
private _workItemFilters?: WorkItemFilters | "auto";
3032

3133
/**
3234
* Creates a new instance of DurableTaskAzureManagedWorkerBuilder.
@@ -220,6 +222,21 @@ export class DurableTaskAzureManagedWorkerBuilder {
220222
return this;
221223
}
222224

225+
/**
226+
* Enables work item filters for the worker.
227+
* When called without arguments, filters are auto-generated from the registered
228+
* orchestrations, activities, and entities.
229+
* When called with a WorkItemFilters object, those specific filters are used.
230+
* By default (when not called), no filters are sent and the worker processes all work items.
231+
*
232+
* @param filters Optional explicit filters. Omit to auto-generate from registry.
233+
* @returns This builder instance.
234+
*/
235+
useWorkItemFilters(filters?: WorkItemFilters): DurableTaskAzureManagedWorkerBuilder {
236+
this._workItemFilters = filters ?? "auto";
237+
return this;
238+
}
239+
223240
/**
224241
* Builds and returns a configured TaskHubGrpcWorker.
225242
*
@@ -251,6 +268,7 @@ export class DurableTaskAzureManagedWorkerBuilder {
251268
logger: this._logger,
252269
shutdownTimeoutMs: this._shutdownTimeoutMs,
253270
versioning: this._versioning,
271+
workItemFilters: this._workItemFilters,
254272
});
255273

256274
// Register all orchestrators

0 commit comments

Comments
 (0)