Skip to content

Commit a6c12f8

Browse files
committed
refactor
1 parent 7d4d811 commit a6c12f8

File tree

3 files changed

+66
-76
lines changed

3 files changed

+66
-76
lines changed

packages/durabletask-js/src/client/client.ts

Lines changed: 24 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4+
import * as grpc from "@grpc/grpc-js";
45
import { StringValue } from "google-protobuf/google/protobuf/wrappers_pb";
56
import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb";
67
import * as pb from "../proto/orchestrator_service_pb";
@@ -16,13 +17,10 @@ import { OrchestrationStatus, toProtobuf } from "../orchestration/enum/orchestra
1617
import { TimeoutError } from "../exception/timeout-error";
1718
import { PurgeResult } from "../orchestration/orchestration-purge-result";
1819
import { PurgeInstanceCriteria } from "../orchestration/orchestration-purge-criteria";
19-
import * as grpc from "@grpc/grpc-js";
20+
import { callWithMetadata, MetadataGenerator } from "../utils/grpc-helper.util";
2021

21-
/**
22-
* A function that generates gRPC metadata for each call.
23-
* This is used for adding authentication tokens, task hub names, and other per-call metadata.
24-
*/
25-
export type MetadataGenerator = () => grpc.Metadata | Promise<grpc.Metadata>;
22+
// Re-export MetadataGenerator for backward compatibility
23+
export { MetadataGenerator } from "../utils/grpc-helper.util";
2624

2725
export class TaskHubGrpcClient {
2826
private _stub: stubs.TaskHubSidecarServiceClient;
@@ -48,39 +46,6 @@ export class TaskHubGrpcClient {
4846
this._metadataGenerator = metadataGenerator;
4947
}
5048

51-
/**
52-
* Helper to get metadata for gRPC calls.
53-
*/
54-
private async _getMetadata(): Promise<grpc.Metadata> {
55-
if (this._metadataGenerator) {
56-
return await this._metadataGenerator();
57-
}
58-
return new grpc.Metadata();
59-
}
60-
61-
/**
62-
* Helper to promisify a gRPC call with metadata support.
63-
*/
64-
private _callWithMetadata<TReq, TRes>(
65-
method: (
66-
req: TReq,
67-
metadata: grpc.Metadata,
68-
callback: (error: grpc.ServiceError | null, response: TRes) => void,
69-
) => grpc.ClientUnaryCall,
70-
req: TReq,
71-
): Promise<TRes> {
72-
return new Promise(async (resolve, reject) => {
73-
const metadata = await this._getMetadata();
74-
method(req, metadata, (error, response) => {
75-
if (error) {
76-
reject(error);
77-
} else {
78-
resolve(response);
79-
}
80-
});
81-
});
82-
}
83-
8449
async stop(): Promise<void> {
8550
await this._stub.close();
8651

@@ -122,9 +87,10 @@ export class TaskHubGrpcClient {
12287

12388
console.log(`Starting new ${name} instance with ID = ${req.getInstanceid()}`);
12489

125-
const res = await this._callWithMetadata<pb.CreateInstanceRequest, pb.CreateInstanceResponse>(
90+
const res = await callWithMetadata<pb.CreateInstanceRequest, pb.CreateInstanceResponse>(
12691
this._stub.startInstance.bind(this._stub),
12792
req,
93+
this._metadataGenerator,
12894
);
12995

13096
return res.getInstanceid();
@@ -148,9 +114,10 @@ export class TaskHubGrpcClient {
148114
req.setInstanceid(instanceId);
149115
req.setGetinputsandoutputs(fetchPayloads);
150116

151-
const res = await this._callWithMetadata<pb.GetInstanceRequest, pb.GetInstanceResponse>(
117+
const res = await callWithMetadata<pb.GetInstanceRequest, pb.GetInstanceResponse>(
152118
this._stub.getInstance.bind(this._stub),
153119
req,
120+
this._metadataGenerator,
154121
);
155122

156123
return newOrchestrationState(req.getInstanceid(), res);
@@ -182,9 +149,10 @@ export class TaskHubGrpcClient {
182149
req.setGetinputsandoutputs(fetchPayloads);
183150

184151
try {
185-
const callPromise = this._callWithMetadata<pb.GetInstanceRequest, pb.GetInstanceResponse>(
152+
const callPromise = callWithMetadata<pb.GetInstanceRequest, pb.GetInstanceResponse>(
186153
this._stub.waitForInstanceStart.bind(this._stub),
187154
req,
155+
this._metadataGenerator,
188156
);
189157

190158
// Execute the request and wait for the first response or timeout
@@ -229,9 +197,10 @@ export class TaskHubGrpcClient {
229197
try {
230198
console.info(`Waiting ${timeout} seconds for instance ${instanceId} to complete...`);
231199

232-
const callPromise = this._callWithMetadata<pb.GetInstanceRequest, pb.GetInstanceResponse>(
200+
const callPromise = callWithMetadata<pb.GetInstanceRequest, pb.GetInstanceResponse>(
233201
this._stub.waitForInstanceCompletion.bind(this._stub),
234202
req,
203+
this._metadataGenerator,
235204
);
236205

237206
// Execute the request and wait for the first response or timeout
@@ -286,9 +255,10 @@ export class TaskHubGrpcClient {
286255

287256
console.log(`Raising event '${eventName}' for instance '${instanceId}'`);
288257

289-
await this._callWithMetadata<pb.RaiseEventRequest, pb.RaiseEventResponse>(
258+
await callWithMetadata<pb.RaiseEventRequest, pb.RaiseEventResponse>(
290259
this._stub.raiseEvent.bind(this._stub),
291260
req,
261+
this._metadataGenerator,
292262
);
293263
}
294264

@@ -309,9 +279,10 @@ export class TaskHubGrpcClient {
309279

310280
console.log(`Terminating '${instanceId}'`);
311281

312-
await this._callWithMetadata<pb.TerminateRequest, pb.TerminateResponse>(
282+
await callWithMetadata<pb.TerminateRequest, pb.TerminateResponse>(
313283
this._stub.terminateInstance.bind(this._stub),
314284
req,
285+
this._metadataGenerator,
315286
);
316287
}
317288

@@ -321,9 +292,10 @@ export class TaskHubGrpcClient {
321292

322293
console.log(`Suspending '${instanceId}'`);
323294

324-
await this._callWithMetadata<pb.SuspendRequest, pb.SuspendResponse>(
295+
await callWithMetadata<pb.SuspendRequest, pb.SuspendResponse>(
325296
this._stub.suspendInstance.bind(this._stub),
326297
req,
298+
this._metadataGenerator,
327299
);
328300
}
329301

@@ -333,9 +305,10 @@ export class TaskHubGrpcClient {
333305

334306
console.log(`Resuming '${instanceId}'`);
335307

336-
await this._callWithMetadata<pb.ResumeRequest, pb.ResumeResponse>(
308+
await callWithMetadata<pb.ResumeRequest, pb.ResumeResponse>(
337309
this._stub.resumeInstance.bind(this._stub),
338310
req,
311+
this._metadataGenerator,
339312
);
340313
}
341314

@@ -365,9 +338,10 @@ export class TaskHubGrpcClient {
365338

366339
console.log(`Purging Instance '${instanceId}'`);
367340

368-
res = await this._callWithMetadata<pb.PurgeInstancesRequest, pb.PurgeInstancesResponse>(
341+
res = await callWithMetadata<pb.PurgeInstancesRequest, pb.PurgeInstancesResponse>(
369342
this._stub.purgeInstances.bind(this._stub),
370343
req,
344+
this._metadataGenerator,
371345
);
372346
} else {
373347
const purgeInstanceCriteria = value;
@@ -394,9 +368,10 @@ export class TaskHubGrpcClient {
394368

395369
console.log("Purging Instance using purging criteria");
396370

397-
const callPromise = this._callWithMetadata<pb.PurgeInstancesRequest, pb.PurgeInstancesResponse>(
371+
const callPromise = callWithMetadata<pb.PurgeInstancesRequest, pb.PurgeInstancesResponse>(
398372
this._stub.purgeInstances.bind(this._stub),
399373
req,
374+
this._metadataGenerator,
400375
);
401376
// Execute the request and wait for the first response or timeout
402377
res = (await Promise.race([
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
import * as grpc from "@grpc/grpc-js";
5+
6+
/**
7+
* Type for a function that generates gRPC metadata (e.g., for taskhub, auth tokens).
8+
*/
9+
export type MetadataGenerator = () => Promise<grpc.Metadata>;
10+
11+
/**
12+
* Promisifies a gRPC unary call with metadata support.
13+
*
14+
* @param method The gRPC method to call (must be bound to the stub).
15+
* @param req The request object.
16+
* @param metadataGenerator Optional function to generate metadata for the call.
17+
* @returns A promise that resolves with the response or rejects with an error.
18+
*/
19+
export function callWithMetadata<TReq, TRes>(
20+
method: (
21+
req: TReq,
22+
metadata: grpc.Metadata,
23+
callback: (error: grpc.ServiceError | null, response: TRes) => void,
24+
) => grpc.ClientUnaryCall,
25+
req: TReq,
26+
metadataGenerator?: MetadataGenerator,
27+
): Promise<TRes> {
28+
return new Promise(async (resolve, reject) => {
29+
const metadata = metadataGenerator ? await metadataGenerator() : new grpc.Metadata();
30+
method(req, metadata, (error, response) => {
31+
if (error) {
32+
reject(error);
33+
} else {
34+
resolve(response);
35+
}
36+
});
37+
});
38+
}

packages/durabletask-js/src/worker/task-hub-grpc-worker.ts

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ import { TOutput } from "../types/output.type";
1212
import { GrpcClient } from "../client/client-grpc";
1313
import { Empty } from "google-protobuf/google/protobuf/empty_pb";
1414
import * as pbh from "../utils/pb-helper.util";
15+
import { callWithMetadata, MetadataGenerator } from "../utils/grpc-helper.util";
1516
import { OrchestrationExecutor } from "./orchestration-executor";
1617
import { ActivityExecutor } from "./activity-executor";
1718
import { StringValue } from "google-protobuf/google/protobuf/wrappers_pb";
18-
import { MetadataGenerator } from "../client/client";
1919

2020
export class TaskHubGrpcWorker {
2121
private _responseStream: grpc.ClientReadableStream<pb.WorkItem> | null;
@@ -67,29 +67,6 @@ export class TaskHubGrpcWorker {
6767
return new grpc.Metadata();
6868
}
6969

70-
/**
71-
* Helper to promisify a gRPC unary call with metadata support.
72-
*/
73-
private _callWithMetadata<TReq, TRes>(
74-
method: (
75-
req: TReq,
76-
metadata: grpc.Metadata,
77-
callback: (error: grpc.ServiceError | null, response: TRes) => void,
78-
) => grpc.ClientUnaryCall,
79-
req: TReq,
80-
): Promise<TRes> {
81-
return new Promise(async (resolve, reject) => {
82-
const metadata = await this._getMetadata();
83-
method(req, metadata, (error, response) => {
84-
if (error) {
85-
reject(error);
86-
} else {
87-
resolve(response);
88-
}
89-
});
90-
});
91-
}
92-
9370
/**
9471
* Registers an orchestrator function with the worker.
9572
*
@@ -169,7 +146,7 @@ export class TaskHubGrpcWorker {
169146
async internalRunWorker(client: GrpcClient, isRetry: boolean = false): Promise<void> {
170147
try {
171148
// send a "Hello" message to the sidecar to ensure that it's listening
172-
await this._callWithMetadata(client.stub.hello.bind(client.stub), new Empty());
149+
await callWithMetadata(client.stub.hello.bind(client.stub), new Empty(), this._metadataGenerator);
173150

174151
// Stream work items from the sidecar (pass metadata for insecure connections)
175152
const metadata = await this._getMetadata();
@@ -304,7 +281,7 @@ export class TaskHubGrpcWorker {
304281
}
305282

306283
try {
307-
await this._callWithMetadata(stub.completeOrchestratorTask.bind(stub), res);
284+
await callWithMetadata(stub.completeOrchestratorTask.bind(stub), res, this._metadataGenerator);
308285
} catch (e: any) {
309286
console.error(`An error occurred while trying to complete instance '${req.getInstanceid()}': ${e?.message}`);
310287
}
@@ -356,7 +333,7 @@ export class TaskHubGrpcWorker {
356333
}
357334

358335
try {
359-
await this._callWithMetadata(stub.completeActivityTask.bind(stub), res);
336+
await callWithMetadata(stub.completeActivityTask.bind(stub), res, this._metadataGenerator);
360337
} catch (e: any) {
361338
console.error(
362339
`Failed to deliver activity response for '${req.getName()}#${req.getTaskid()}' of orchestration ID '${instanceId}' to sidecar: ${

0 commit comments

Comments
 (0)