Skip to content

Commit 2933761

Browse files
committed
rewind instance
1 parent d24ebc5 commit 2933761

2 files changed

Lines changed: 284 additions & 0 deletions

File tree

packages/durabletask-js/src/client/client.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,64 @@ export class TaskHubGrpcClient {
315315
);
316316
}
317317

318+
/**
319+
* Rewinds a failed orchestration instance to a previous state to allow it to retry from the point of failure.
320+
*
321+
* This method is used to "rewind" a failed orchestration back to its last known good state, allowing it
322+
* to be replayed from that point. This is particularly useful for recovering from transient failures
323+
* or for debugging purposes.
324+
*
325+
* Only orchestration instances in the `Failed` state can be rewound.
326+
*
327+
* @param instanceId - The unique identifier of the orchestration instance to rewind.
328+
* @param reason - A reason string describing why the orchestration is being rewound.
329+
* @throws {Error} If the orchestration instance is not found.
330+
* @throws {Error} If the orchestration instance is in a state that does not allow rewinding.
331+
* @throws {Error} If the rewind operation is not supported by the backend.
332+
*/
333+
async rewindInstance(instanceId: string, reason: string): Promise<void> {
334+
if (!instanceId) {
335+
throw new Error("instanceId is required");
336+
}
337+
338+
const req = new pb.RewindInstanceRequest();
339+
req.setInstanceid(instanceId);
340+
341+
if (reason) {
342+
const reasonValue = new StringValue();
343+
reasonValue.setValue(reason);
344+
req.setReason(reasonValue);
345+
}
346+
347+
console.log(`Rewinding '${instanceId}' with reason: ${reason}`);
348+
349+
try {
350+
await callWithMetadata<pb.RewindInstanceRequest, pb.RewindInstanceResponse>(
351+
this._stub.rewindInstance.bind(this._stub),
352+
req,
353+
this._metadataGenerator,
354+
);
355+
} catch (e) {
356+
// Handle gRPC errors and convert them to appropriate errors
357+
if (e && typeof e === "object" && "code" in e) {
358+
const grpcError = e as { code: number; details?: string };
359+
if (grpcError.code === grpc.status.NOT_FOUND) {
360+
throw new Error(`An orchestration with the instanceId '${instanceId}' was not found.`);
361+
}
362+
if (grpcError.code === grpc.status.FAILED_PRECONDITION) {
363+
throw new Error(grpcError.details || `Cannot rewind orchestration '${instanceId}': it is in a state that does not allow rewinding.`);
364+
}
365+
if (grpcError.code === grpc.status.UNIMPLEMENTED) {
366+
throw new Error(grpcError.details || `The rewind operation is not supported by the backend.`);
367+
}
368+
if (grpcError.code === grpc.status.CANCELLED) {
369+
throw new Error(`The rewind operation for '${instanceId}' was cancelled.`);
370+
}
371+
}
372+
throw e;
373+
}
374+
}
375+
318376
/**
319377
* Purges orchestration instance metadata from the durable store.
320378
*
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* E2E tests for rewindInstance API.
6+
*
7+
* NOTE: These tests can run against either:
8+
* 1. DTS emulator - set ENDPOINT and TASKHUB environment variables
9+
* 2. Real Azure DTS - set AZURE_DTS_CONNECTION_STRING environment variable
10+
*
11+
* Example for emulator:
12+
* docker run -i -p 8080:8080 -d mcr.microsoft.com/dts/dts-emulator:latest
13+
* ENDPOINT=localhost:8080 TASKHUB=default npx jest rewind.spec.ts
14+
*
15+
* Example for real DTS:
16+
* AZURE_DTS_CONNECTION_STRING="Endpoint=https://...;Authentication=DefaultAzure;TaskHub=th3" npx jest rewind.spec.ts
17+
*/
18+
19+
import {
20+
TaskHubGrpcClient,
21+
TaskHubGrpcWorker,
22+
OrchestrationStatus,
23+
OrchestrationContext,
24+
TOrchestrator,
25+
} from "@microsoft/durabletask-js";
26+
import {
27+
DurableTaskAzureManagedClientBuilder,
28+
DurableTaskAzureManagedWorkerBuilder,
29+
} from "@microsoft/durabletask-js-azuremanaged";
30+
31+
// Read environment variables - support both connection string and endpoint/taskhub
32+
const connectionString = process.env.AZURE_DTS_CONNECTION_STRING;
33+
const endpoint = process.env.ENDPOINT || "localhost:8080";
34+
const taskHub = process.env.TASKHUB || "default";
35+
36+
function createClient(): TaskHubGrpcClient {
37+
const builder = new DurableTaskAzureManagedClientBuilder();
38+
if (connectionString) {
39+
return builder.connectionString(connectionString).build();
40+
}
41+
return builder.endpoint(endpoint, taskHub, null).build();
42+
}
43+
44+
function createWorker(): TaskHubGrpcWorker {
45+
const builder = new DurableTaskAzureManagedWorkerBuilder();
46+
if (connectionString) {
47+
return builder.connectionString(connectionString).build();
48+
}
49+
return builder.endpoint(endpoint, taskHub, null).build();
50+
}
51+
52+
describe("Rewind Instance E2E Tests", () => {
53+
let taskHubClient: TaskHubGrpcClient;
54+
let taskHubWorker: TaskHubGrpcWorker;
55+
let workerStarted = false;
56+
57+
// Helper to start worker and track state
58+
const startWorker = async () => {
59+
await taskHubWorker.start();
60+
workerStarted = true;
61+
};
62+
63+
beforeEach(async () => {
64+
workerStarted = false;
65+
taskHubClient = createClient();
66+
taskHubWorker = createWorker();
67+
});
68+
69+
afterEach(async () => {
70+
if (workerStarted) {
71+
await taskHubWorker.stop();
72+
}
73+
await taskHubClient.stop();
74+
});
75+
76+
describe("rewindInstance - positive cases", () => {
77+
// Track execution attempt count for retry logic
78+
let attemptCount = 0;
79+
80+
// An orchestrator that fails on first attempt, succeeds on subsequent attempts
81+
const failOnceOrchestrator: TOrchestrator = async (_ctx: OrchestrationContext, _input: number) => {
82+
// Use input as a key to track attempts per instance
83+
// After rewind, the orchestrator replays from the beginning
84+
attemptCount++;
85+
if (attemptCount === 1) {
86+
throw new Error("First attempt failed!");
87+
}
88+
return `Success on attempt ${attemptCount}`;
89+
};
90+
91+
beforeEach(() => {
92+
attemptCount = 0;
93+
});
94+
95+
// Skip these tests if the backend doesn't support rewind (emulator returns UNIMPLEMENTED)
96+
it.skip("should rewind a failed orchestration instance (requires backend support)", async () => {
97+
const instanceId = `rewind-test-${Date.now()}`;
98+
99+
taskHubWorker.addOrchestrator(failOnceOrchestrator);
100+
await startWorker();
101+
102+
// Schedule the orchestration - it will fail on first attempt
103+
await taskHubClient.scheduleNewOrchestration(failOnceOrchestrator, 1, instanceId);
104+
105+
// Wait for the orchestration to fail
106+
const failedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30);
107+
expect(failedState).toBeDefined();
108+
expect(failedState?.runtimeStatus).toBe(OrchestrationStatus.FAILED);
109+
expect(failedState?.failureDetails?.message).toContain("First attempt failed!");
110+
111+
// Now rewind the failed orchestration
112+
await taskHubClient.rewindInstance(instanceId, "Testing rewind functionality");
113+
114+
// The orchestration should now be running again
115+
// Wait for it to complete (successfully this time)
116+
const rewindedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30);
117+
expect(rewindedState).toBeDefined();
118+
expect(rewindedState?.runtimeStatus).toBe(OrchestrationStatus.COMPLETED);
119+
expect(rewindedState?.serializedOutput).toContain("Success on attempt");
120+
});
121+
122+
it.skip("should rewind a failed orchestration with a descriptive reason (requires backend support)", async () => {
123+
const instanceId = `rewind-reason-${Date.now()}`;
124+
const rewindReason = "Rewinding due to transient network failure";
125+
126+
taskHubWorker.addOrchestrator(failOnceOrchestrator);
127+
await startWorker();
128+
129+
// Schedule and wait for failure
130+
await taskHubClient.scheduleNewOrchestration(failOnceOrchestrator, 1, instanceId);
131+
const failedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30);
132+
expect(failedState?.runtimeStatus).toBe(OrchestrationStatus.FAILED);
133+
134+
// Rewind with a specific reason
135+
await taskHubClient.rewindInstance(instanceId, rewindReason);
136+
137+
// Verify it can complete after rewind
138+
const rewindedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30);
139+
expect(rewindedState?.runtimeStatus).toBe(OrchestrationStatus.COMPLETED);
140+
});
141+
});
142+
143+
describe("rewindInstance - negative cases", () => {
144+
// A simple orchestrator that completes successfully
145+
const simpleOrchestrator: TOrchestrator = async (ctx: OrchestrationContext, input: number) => {
146+
return input * 2;
147+
};
148+
149+
// An orchestrator that waits for an event (stays in Running state)
150+
const waitingOrchestrator: TOrchestrator = async (ctx: OrchestrationContext) => {
151+
const approval = await ctx.waitForExternalEvent("approval");
152+
return `Approved: ${approval}`;
153+
};
154+
155+
it("should throw an error when rewinding a non-existent instance (or if rewind is not supported)", async () => {
156+
const nonExistentId = `non-existent-${Date.now()}-${Math.random().toString(36).substring(7)}`;
157+
158+
// No need to start worker for this test
159+
// Will throw either "not found" or "not supported" depending on backend
160+
await expect(taskHubClient.rewindInstance(nonExistentId, "Test rewind")).rejects.toThrow();
161+
});
162+
163+
it("should throw an error when rewinding a completed orchestration (or if rewind is not supported)", async () => {
164+
const instanceId = `rewind-completed-${Date.now()}`;
165+
166+
taskHubWorker.addOrchestrator(simpleOrchestrator);
167+
await startWorker();
168+
169+
// Schedule and wait for completion
170+
await taskHubClient.scheduleNewOrchestration(simpleOrchestrator, 5, instanceId);
171+
const state = await taskHubClient.waitForOrchestrationCompletion(instanceId, true, 30);
172+
expect(state?.runtimeStatus).toBe(OrchestrationStatus.COMPLETED);
173+
174+
// Try to rewind a completed orchestration - should fail
175+
await expect(taskHubClient.rewindInstance(instanceId, "Test rewind")).rejects.toThrow();
176+
});
177+
178+
it.skip("should throw an error when rewinding a running orchestration (requires backend support)", async () => {
179+
const instanceId = `rewind-running-${Date.now()}`;
180+
181+
taskHubWorker.addOrchestrator(waitingOrchestrator);
182+
await startWorker();
183+
184+
// Schedule the orchestration (will be waiting for event)
185+
await taskHubClient.scheduleNewOrchestration(waitingOrchestrator, undefined, instanceId);
186+
187+
// Wait for it to start running
188+
await taskHubClient.waitForOrchestrationStart(instanceId, false, 30);
189+
190+
// Try to rewind a running orchestration - should fail
191+
try {
192+
await taskHubClient.rewindInstance(instanceId, "Test rewind");
193+
// If we get here, the operation didn't throw - which might be expected on some backends
194+
} catch (e) {
195+
expect((e as Error).message).toMatch(/not allow|precondition|running/i);
196+
} finally {
197+
// Clean up: terminate the waiting orchestration
198+
await taskHubClient.terminateOrchestration(instanceId, "Test cleanup");
199+
await taskHubClient.waitForOrchestrationCompletion(instanceId, false, 30);
200+
}
201+
});
202+
203+
it.skip("should throw an error when rewinding a terminated orchestration (requires backend support)", async () => {
204+
const instanceId = `rewind-terminated-${Date.now()}`;
205+
206+
taskHubWorker.addOrchestrator(waitingOrchestrator);
207+
await startWorker();
208+
209+
// Schedule the orchestration
210+
await taskHubClient.scheduleNewOrchestration(waitingOrchestrator, undefined, instanceId);
211+
await taskHubClient.waitForOrchestrationStart(instanceId, false, 30);
212+
213+
// Terminate it
214+
await taskHubClient.terminateOrchestration(instanceId, "Terminating for test");
215+
const terminatedState = await taskHubClient.waitForOrchestrationCompletion(instanceId, false, 30);
216+
expect(terminatedState?.runtimeStatus).toBe(OrchestrationStatus.TERMINATED);
217+
218+
// Try to rewind a terminated orchestration - should fail
219+
await expect(taskHubClient.rewindInstance(instanceId, "Test rewind")).rejects.toThrow();
220+
});
221+
222+
it("should throw an error when instanceId is empty", async () => {
223+
await expect(taskHubClient.rewindInstance("", "Test rewind")).rejects.toThrow("instanceId is required");
224+
});
225+
});
226+
});

0 commit comments

Comments
 (0)