Skip to content

Commit 2e14296

Browse files
committed
feat: Enhance TaskHubGrpcClient and TaskHubGrpcWorker with options interfaces and constructors
1 parent 16d4810 commit 2e14296

File tree

4 files changed

+174
-47
lines changed

4 files changed

+174
-47
lines changed

packages/durabletask-js/src/client/client.ts

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,36 @@ import { Logger, ConsoleLogger } from "../types/logger.type";
2626
// Re-export MetadataGenerator for backward compatibility
2727
export { MetadataGenerator } from "../utils/grpc-helper.util";
2828

29+
/**
30+
* Options for creating a TaskHubGrpcClient.
31+
*/
32+
export interface TaskHubGrpcClientOptions {
33+
/** The host address to connect to. Defaults to "localhost:4001". */
34+
hostAddress?: string;
35+
/** gRPC channel options. */
36+
options?: grpc.ChannelOptions;
37+
/** Whether to use TLS. Defaults to false. */
38+
useTLS?: boolean;
39+
/** Optional pre-configured channel credentials. If provided, useTLS is ignored. */
40+
credentials?: grpc.ChannelCredentials;
41+
/** Optional function to generate per-call metadata (for taskhub, auth tokens, etc.). */
42+
metadataGenerator?: MetadataGenerator;
43+
/** Optional logger instance. Defaults to ConsoleLogger. */
44+
logger?: Logger;
45+
}
46+
2947
export class TaskHubGrpcClient {
3048
private _stub: stubs.TaskHubSidecarServiceClient;
3149
private _metadataGenerator?: MetadataGenerator;
3250
private _logger: Logger;
3351

52+
/**
53+
* Creates a new TaskHubGrpcClient instance.
54+
*
55+
* @param options Configuration options for the client.
56+
*/
57+
constructor(options: TaskHubGrpcClientOptions);
58+
3459
/**
3560
* Creates a new TaskHubGrpcClient instance.
3661
*
@@ -40,6 +65,7 @@ export class TaskHubGrpcClient {
4065
* @param credentials Optional pre-configured channel credentials. If provided, useTLS is ignored.
4166
* @param metadataGenerator Optional function to generate per-call metadata (for taskhub, auth tokens, etc.).
4267
* @param logger Optional logger instance. Defaults to ConsoleLogger.
68+
* @deprecated Use the options object constructor instead.
4369
*/
4470
constructor(
4571
hostAddress?: string,
@@ -48,10 +74,44 @@ export class TaskHubGrpcClient {
4874
credentials?: grpc.ChannelCredentials,
4975
metadataGenerator?: MetadataGenerator,
5076
logger?: Logger,
77+
);
78+
79+
constructor(
80+
hostAddressOrOptions?: string | TaskHubGrpcClientOptions,
81+
options?: grpc.ChannelOptions,
82+
useTLS?: boolean,
83+
credentials?: grpc.ChannelCredentials,
84+
metadataGenerator?: MetadataGenerator,
85+
logger?: Logger,
5186
) {
52-
this._stub = new GrpcClient(hostAddress, options, useTLS, credentials).stub;
53-
this._metadataGenerator = metadataGenerator;
54-
this._logger = logger ?? new ConsoleLogger();
87+
let resolvedHostAddress: string | undefined;
88+
let resolvedOptions: grpc.ChannelOptions | undefined;
89+
let resolvedUseTLS: boolean | undefined;
90+
let resolvedCredentials: grpc.ChannelCredentials | undefined;
91+
let resolvedMetadataGenerator: MetadataGenerator | undefined;
92+
let resolvedLogger: Logger | undefined;
93+
94+
if (typeof hostAddressOrOptions === "object" && hostAddressOrOptions !== null) {
95+
// Options object constructor
96+
resolvedHostAddress = hostAddressOrOptions.hostAddress;
97+
resolvedOptions = hostAddressOrOptions.options;
98+
resolvedUseTLS = hostAddressOrOptions.useTLS;
99+
resolvedCredentials = hostAddressOrOptions.credentials;
100+
resolvedMetadataGenerator = hostAddressOrOptions.metadataGenerator;
101+
resolvedLogger = hostAddressOrOptions.logger;
102+
} else {
103+
// Deprecated positional parameters constructor
104+
resolvedHostAddress = hostAddressOrOptions;
105+
resolvedOptions = options;
106+
resolvedUseTLS = useTLS;
107+
resolvedCredentials = credentials;
108+
resolvedMetadataGenerator = metadataGenerator;
109+
resolvedLogger = logger;
110+
}
111+
112+
this._stub = new GrpcClient(resolvedHostAddress, resolvedOptions, resolvedUseTLS, resolvedCredentials).stub;
113+
this._metadataGenerator = resolvedMetadataGenerator;
114+
this._logger = resolvedLogger ?? new ConsoleLogger();
55115
}
56116

57117
async stop(): Promise<void> {

packages/durabletask-js/src/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
// Licensed under the MIT License.
33

44
// Client and Worker
5-
export { TaskHubGrpcClient, MetadataGenerator } from "./client/client";
6-
export { TaskHubGrpcWorker } from "./worker/task-hub-grpc-worker";
5+
export { TaskHubGrpcClient, TaskHubGrpcClientOptions, MetadataGenerator } from "./client/client";
6+
export { TaskHubGrpcWorker, TaskHubGrpcWorkerOptions } from "./worker/task-hub-grpc-worker";
77

88
// Contexts
99
export { OrchestrationContext } from "./task/context/orchestration-context";

packages/durabletask-js/src/worker/orchestration-executor.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import { Registry } from "./registry";
2323
import { RuntimeOrchestrationContext } from "./runtime-orchestration-context";
2424

2525
export class OrchestrationExecutor {
26-
_generator?: TOrchestrator;
27-
_registry: Registry;
28-
_isSuspended: boolean;
29-
_suspendedEvents: pb.HistoryEvent[];
26+
private _generator?: TOrchestrator;
27+
private _registry: Registry;
28+
private _isSuspended: boolean;
29+
private _suspendedEvents: pb.HistoryEvent[];
3030
private _logger: Logger;
3131

3232
constructor(registry: Registry, logger?: Logger) {

packages/durabletask-js/src/worker/task-hub-grpc-worker.ts

Lines changed: 105 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,26 @@ import { ExponentialBackoff, sleep, withTimeout } from "../utils/backoff.util";
2222
/** Default timeout in milliseconds for graceful shutdown. */
2323
const DEFAULT_SHUTDOWN_TIMEOUT_MS = 30000;
2424

25+
/**
26+
* Options for creating a TaskHubGrpcWorker.
27+
*/
28+
export interface TaskHubGrpcWorkerOptions {
29+
/** The host address to connect to. Defaults to "localhost:4001". */
30+
hostAddress?: string;
31+
/** gRPC channel options. */
32+
options?: grpc.ChannelOptions;
33+
/** Whether to use TLS. Defaults to false. */
34+
useTLS?: boolean;
35+
/** Optional pre-configured channel credentials. If provided, useTLS is ignored. */
36+
credentials?: grpc.ChannelCredentials;
37+
/** Optional function to generate per-call metadata (for taskhub, auth tokens, etc.). */
38+
metadataGenerator?: MetadataGenerator;
39+
/** Optional logger instance. Defaults to ConsoleLogger. */
40+
logger?: Logger;
41+
/** Optional timeout in milliseconds for graceful shutdown. Defaults to 30000. */
42+
shutdownTimeoutMs?: number;
43+
}
44+
2545
export class TaskHubGrpcWorker {
2646
private _responseStream: grpc.ClientReadableStream<pb.WorkItem> | null;
2747
private _registry: Registry;
@@ -38,6 +58,13 @@ export class TaskHubGrpcWorker {
3858
private _shutdownTimeoutMs: number;
3959
private _backoff: ExponentialBackoff;
4060

61+
/**
62+
* Creates a new TaskHubGrpcWorker instance.
63+
*
64+
* @param options Configuration options for the worker.
65+
*/
66+
constructor(options: TaskHubGrpcWorkerOptions);
67+
4168
/**
4269
* Creates a new TaskHubGrpcWorker instance.
4370
*
@@ -48,6 +75,7 @@ export class TaskHubGrpcWorker {
4875
* @param metadataGenerator Optional function to generate per-call metadata (for taskhub, auth tokens, etc.).
4976
* @param logger Optional logger instance. Defaults to ConsoleLogger.
5077
* @param shutdownTimeoutMs Optional timeout in milliseconds for graceful shutdown. Defaults to 30000.
78+
* @deprecated Use the options object constructor instead.
5179
*/
5280
constructor(
5381
hostAddress?: string,
@@ -57,20 +85,58 @@ export class TaskHubGrpcWorker {
5785
metadataGenerator?: MetadataGenerator,
5886
logger?: Logger,
5987
shutdownTimeoutMs?: number,
88+
);
89+
90+
constructor(
91+
hostAddressOrOptions?: string | TaskHubGrpcWorkerOptions,
92+
options?: grpc.ChannelOptions,
93+
useTLS?: boolean,
94+
credentials?: grpc.ChannelCredentials,
95+
metadataGenerator?: MetadataGenerator,
96+
logger?: Logger,
97+
shutdownTimeoutMs?: number,
6098
) {
99+
let resolvedHostAddress: string | undefined;
100+
let resolvedOptions: grpc.ChannelOptions | undefined;
101+
let resolvedUseTLS: boolean | undefined;
102+
let resolvedCredentials: grpc.ChannelCredentials | undefined;
103+
let resolvedMetadataGenerator: MetadataGenerator | undefined;
104+
let resolvedLogger: Logger | undefined;
105+
let resolvedShutdownTimeoutMs: number | undefined;
106+
107+
if (typeof hostAddressOrOptions === "object" && hostAddressOrOptions !== null) {
108+
// Options object constructor
109+
resolvedHostAddress = hostAddressOrOptions.hostAddress;
110+
resolvedOptions = hostAddressOrOptions.options;
111+
resolvedUseTLS = hostAddressOrOptions.useTLS;
112+
resolvedCredentials = hostAddressOrOptions.credentials;
113+
resolvedMetadataGenerator = hostAddressOrOptions.metadataGenerator;
114+
resolvedLogger = hostAddressOrOptions.logger;
115+
resolvedShutdownTimeoutMs = hostAddressOrOptions.shutdownTimeoutMs;
116+
} else {
117+
// Deprecated positional parameters constructor
118+
resolvedHostAddress = hostAddressOrOptions;
119+
resolvedOptions = options;
120+
resolvedUseTLS = useTLS;
121+
resolvedCredentials = credentials;
122+
resolvedMetadataGenerator = metadataGenerator;
123+
resolvedLogger = logger;
124+
resolvedShutdownTimeoutMs = shutdownTimeoutMs;
125+
}
126+
61127
this._registry = new Registry();
62-
this._hostAddress = hostAddress;
63-
this._tls = useTLS;
64-
this._grpcChannelOptions = options;
65-
this._grpcChannelCredentials = credentials;
66-
this._metadataGenerator = metadataGenerator;
128+
this._hostAddress = resolvedHostAddress;
129+
this._tls = resolvedUseTLS;
130+
this._grpcChannelOptions = resolvedOptions;
131+
this._grpcChannelCredentials = resolvedCredentials;
132+
this._metadataGenerator = resolvedMetadataGenerator;
67133
this._responseStream = null;
68134
this._isRunning = false;
69135
this._stopWorker = false;
70136
this._stub = null;
71-
this._logger = logger ?? new ConsoleLogger();
137+
this._logger = resolvedLogger ?? new ConsoleLogger();
72138
this._pendingWorkItems = new Set();
73-
this._shutdownTimeoutMs = shutdownTimeoutMs ?? DEFAULT_SHUTDOWN_TIMEOUT_MS;
139+
this._shutdownTimeoutMs = resolvedShutdownTimeoutMs ?? DEFAULT_SHUTDOWN_TIMEOUT_MS;
74140
this._backoff = new ExponentialBackoff({
75141
initialDelayMs: 1000,
76142
maxDelayMs: 30000,
@@ -88,6 +154,34 @@ export class TaskHubGrpcWorker {
88154
return new grpc.Metadata();
89155
}
90156

157+
/**
158+
* Creates a new gRPC client and retries the worker.
159+
* Properly closes the old client to prevent connection leaks.
160+
*/
161+
private async _createNewClientAndRetry(): Promise<void> {
162+
// Close the old stub to prevent connection leaks
163+
if (this._stub) {
164+
this._stub.close();
165+
}
166+
167+
await this._backoff.wait();
168+
169+
const newClient = new GrpcClient(
170+
this._hostAddress,
171+
this._grpcChannelOptions,
172+
this._tls,
173+
this._grpcChannelCredentials,
174+
);
175+
this._stub = newClient.stub;
176+
177+
// Do not await - run in background
178+
this.internalRunWorker(newClient, true).catch((err) => {
179+
if (!this._stopWorker) {
180+
this._logger.error("Worker error:", err);
181+
}
182+
});
183+
}
184+
91185
/**
92186
* Registers an orchestrator function with the worker.
93187
*
@@ -216,21 +310,7 @@ export class TaskHubGrpcWorker {
216310
stream.removeAllListeners();
217311
stream.destroy();
218312
this._logger.info(`Stream abruptly closed, will retry in ${this._backoff.peekNextDelay()}ms...`);
219-
await this._backoff.wait();
220-
// Create a new client for the retry to avoid stale channel issues
221-
const newClient = new GrpcClient(
222-
this._hostAddress,
223-
this._grpcChannelOptions,
224-
this._tls,
225-
this._grpcChannelCredentials,
226-
);
227-
this._stub = newClient.stub;
228-
// do not await
229-
this.internalRunWorker(newClient, true).catch((err) => {
230-
if (!this._stopWorker) {
231-
this._logger.error("Worker error:", err);
232-
}
233-
});
313+
await this._createNewClientAndRetry();
234314
});
235315

236316
stream.on("error", (err: Error) => {
@@ -250,20 +330,7 @@ export class TaskHubGrpcWorker {
250330
throw err;
251331
}
252332
this._logger.info(`Connection will be retried in ${this._backoff.peekNextDelay()}ms...`);
253-
await this._backoff.wait();
254-
// Create a new client for the retry
255-
const newClient = new GrpcClient(
256-
this._hostAddress,
257-
this._grpcChannelOptions,
258-
this._tls,
259-
this._grpcChannelCredentials,
260-
);
261-
this._stub = newClient.stub;
262-
this.internalRunWorker(newClient, true).catch((retryErr) => {
263-
if (!this._stopWorker) {
264-
this._logger.error("Worker error:", retryErr);
265-
}
266-
});
333+
await this._createNewClientAndRetry();
267334
return;
268335
}
269336
}
@@ -323,8 +390,8 @@ export class TaskHubGrpcWorker {
323390
}
324391

325392
if (this._stub) {
326-
// Await the stub close operation to ensure gRPC client cleanup completes before returning.
327-
await Promise.resolve(this._stub.close());
393+
// Close the gRPC client - this is a synchronous operation
394+
this._stub.close();
328395
}
329396
this._isRunning = false;
330397

0 commit comments

Comments
 (0)