Skip to content

Commit 77c2667

Browse files
committed
fix: Reorder stream cleanup to prevent unhandled CANCELLED errors
- Cancel stream before removing listeners in stop() so error handler can suppress CANCELLED - Remove unnecessary stream.cancel() in 'end' handler (stream already closed) - Add .catch() handlers to all recursive internalRunWorker calls
1 parent 2457971 commit 77c2667

File tree

1 file changed

+21
-8
lines changed

1 file changed

+21
-8
lines changed

packages/durabletask-js/src/worker/task-hub-grpc-worker.ts

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -206,14 +206,15 @@ export class TaskHubGrpcWorker {
206206

207207
// Wait for the stream to end or error
208208
stream.on("end", async () => {
209-
// Clean up event listeners to prevent memory leaks
210-
stream.removeAllListeners();
211-
stream.cancel();
212-
stream.destroy();
213209
if (this._stopWorker) {
214210
this._logger.info("Stream ended");
211+
stream.removeAllListeners();
212+
stream.destroy();
215213
return;
216214
}
215+
// Stream ended unexpectedly - clean up and retry
216+
stream.removeAllListeners();
217+
stream.destroy();
217218
this._logger.info(`Stream abruptly closed, will retry in ${this._backoff.peekNextDelay()}ms...`);
218219
await this._backoff.wait();
219220
// Create a new client for the retry to avoid stale channel issues
@@ -225,7 +226,11 @@ export class TaskHubGrpcWorker {
225226
);
226227
this._stub = newClient.stub;
227228
// do not await
228-
this.internalRunWorker(newClient, true);
229+
this.internalRunWorker(newClient, true).catch((err) => {
230+
if (!this._stopWorker) {
231+
this._logger.error(`Worker error: ${err}`);
232+
}
233+
});
229234
});
230235

231236
stream.on("error", (err: Error) => {
@@ -254,7 +259,11 @@ export class TaskHubGrpcWorker {
254259
this._grpcChannelCredentials,
255260
);
256261
this._stub = newClient.stub;
257-
this.internalRunWorker(newClient, true);
262+
this.internalRunWorker(newClient, true).catch((retryErr) => {
263+
if (!this._stopWorker) {
264+
this._logger.error(`Worker error: ${retryErr}`);
265+
}
266+
});
258267
return;
259268
}
260269
}
@@ -270,9 +279,13 @@ export class TaskHubGrpcWorker {
270279

271280
this._stopWorker = true;
272281

273-
// Clean up stream listeners to prevent memory leaks
274-
this._responseStream?.removeAllListeners();
282+
// Cancel stream first while error handlers are still attached
283+
// This allows the error handler to suppress CANCELLED errors
275284
this._responseStream?.cancel();
285+
// Brief pause to let cancellation error propagate to handlers
286+
await sleep(10);
287+
// Now safe to remove listeners and destroy
288+
this._responseStream?.removeAllListeners();
276289
this._responseStream?.destroy();
277290

278291
// Wait for pending work items to complete with timeout

0 commit comments

Comments
 (0)