Skip to content

Commit c2e439c

Browse files
authored
Fix continueAsNew dropping fire-and-forget actions (sendEvent, signalEntity) (#136)
1 parent fe09e4f commit c2e439c

2 files changed

Lines changed: 48 additions & 2 deletions

File tree

packages/durabletask-js/src/worker/runtime-orchestration-context.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,6 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
247247

248248
getActions(): pb.OrchestratorAction[] {
249249
if (this._completionStatus === pb.OrchestrationStatus.ORCHESTRATION_STATUS_CONTINUED_AS_NEW) {
250-
// Only return the single completion actions when continuing-as-new
251250
let carryoverEvents: pb.HistoryEvent[] | null = null;
252251

253252
if (this._saveEvents) {
@@ -271,7 +270,11 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
271270
carryoverEvents,
272271
);
273272

274-
return [action];
273+
// Include fire-and-forget actions (sendEvent, signalEntity, etc.) that were
274+
// scheduled before continueAsNew was called, consistent with setComplete/setFailed
275+
const allActions = Object.values(this._pendingActions);
276+
allActions.push(action);
277+
return allActions;
275278
}
276279

277280
return Object.values(this._pendingActions);

packages/durabletask-js/test/in-memory-backend.spec.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,49 @@ describe("In-Memory Backend", () => {
229229
expect(state?.serializedOutput).toEqual(JSON.stringify(5));
230230
});
231231

232+
it("should preserve sendEvent actions when continuing-as-new", async () => {
233+
// Receiver orchestration that waits for an event
234+
const receiver: TOrchestrator = async function* (ctx: OrchestrationContext): any {
235+
const value = yield ctx.waitForExternalEvent("ping");
236+
return value;
237+
};
238+
239+
// Sender orchestration that sends an event then continues-as-new
240+
const sender: TOrchestrator = async (ctx: OrchestrationContext, input: { receiverId: string; iteration: number }) => {
241+
if (input.iteration === 1) {
242+
// On first iteration, send event to receiver then continue-as-new
243+
ctx.sendEvent(input.receiverId, "ping", "hello from sender");
244+
ctx.continueAsNew({ receiverId: input.receiverId, iteration: 2 }, false);
245+
} else {
246+
return "sender done";
247+
}
248+
};
249+
250+
worker.addOrchestrator(receiver);
251+
worker.addOrchestrator(sender);
252+
await worker.start();
253+
254+
// Start receiver first, then sender
255+
const receiverId = await client.scheduleNewOrchestration(receiver);
256+
await client.waitForOrchestrationStart(receiverId, false, 5);
257+
258+
const senderId = await client.scheduleNewOrchestration(sender, { receiverId, iteration: 1 });
259+
260+
// Wait for both to complete
261+
const senderState = await client.waitForOrchestrationCompletion(senderId, true, 10);
262+
const receiverState = await client.waitForOrchestrationCompletion(receiverId, true, 10);
263+
264+
// Sender should complete after continuing-as-new
265+
expect(senderState).toBeDefined();
266+
expect(senderState?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED);
267+
expect(senderState?.serializedOutput).toEqual(JSON.stringify("sender done"));
268+
269+
// Receiver should have received the event sent before continue-as-new
270+
expect(receiverState).toBeDefined();
271+
expect(receiverState?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED);
272+
expect(receiverState?.serializedOutput).toEqual(JSON.stringify("hello from sender"));
273+
});
274+
232275
it("should handle orchestration without activities", async () => {
233276
const orchestrator: TOrchestrator = async (_: OrchestrationContext, input: number) => {
234277
return input * 2;

0 commit comments

Comments
 (0)