Skip to content

Commit 805a82b

Browse files
committed
2 parents 5f82617 + 0376f94 commit 805a82b

16 files changed

Lines changed: 2540 additions & 13 deletions

packages/durabletask-js/src/client/client.ts

Lines changed: 118 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@ import { callWithMetadata, MetadataGenerator } from "../utils/grpc-helper.util";
2121
import { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "../orchestration/orchestration-query";
2222
import { Page, AsyncPageable, createAsyncPageable } from "../orchestration/page";
2323
import { FailureDetails } from "../task/failure-details";
24+
import { HistoryEvent } from "../orchestration/history-event";
25+
import { convertProtoHistoryEvent } from "../utils/history-event-converter";
2426
import { Logger, ConsoleLogger } from "../types/logger.type";
27+
import { StartOrchestrationOptions } from "../task/options";
28+
import { mapToRecord } from "../utils/tags.util";
29+
import { populateTagsMap } from "../utils/pb-helper.util";
2530

2631
// Re-export MetadataGenerator for backward compatibility
2732
export { MetadataGenerator } from "../utils/grpc-helper.util";
@@ -133,13 +138,46 @@ export class TaskHubGrpcClient {
133138
input?: TInput,
134139
instanceId?: string,
135140
startAt?: Date,
141+
): Promise<string>;
142+
/**
143+
* Schedules a new orchestrator using the DurableTask client.
144+
*
145+
* @param {TOrchestrator | string} orchestrator - The orchestrator or the name of the orchestrator to be scheduled.
146+
* @param {TInput} input - Optional input for the orchestrator.
147+
* @param {StartOrchestrationOptions} options - Options for instance ID, start time, and tags.
148+
* @return {Promise<string>} A Promise resolving to the unique ID of the scheduled orchestrator instance.
149+
*/
150+
async scheduleNewOrchestration(
151+
orchestrator: TOrchestrator | string,
152+
input?: TInput,
153+
options?: StartOrchestrationOptions,
154+
): Promise<string>;
155+
async scheduleNewOrchestration(
156+
orchestrator: TOrchestrator | string,
157+
input?: TInput,
158+
instanceIdOrOptions?: string | StartOrchestrationOptions,
159+
startAt?: Date,
136160
): Promise<string> {
137161
let name;
138162
if (typeof orchestrator === "string") {
139163
name = orchestrator;
140164
} else {
141165
name = getName(orchestrator);
142166
}
167+
168+
const instanceId =
169+
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
170+
? instanceIdOrOptions
171+
: instanceIdOrOptions.instanceId;
172+
const scheduledStartAt =
173+
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
174+
? startAt
175+
: instanceIdOrOptions.startAt;
176+
const tags =
177+
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
178+
? undefined
179+
: instanceIdOrOptions.tags;
180+
143181
const req = new pb.CreateInstanceRequest();
144182
req.setName(name);
145183
req.setInstanceid(instanceId ?? randomUUID());
@@ -148,11 +186,13 @@ export class TaskHubGrpcClient {
148186
i.setValue(JSON.stringify(input));
149187

150188
const ts = new Timestamp();
151-
ts.fromDate(new Date(startAt?.getTime() ?? 0));
189+
ts.fromDate(new Date(scheduledStartAt?.getTime() ?? 0));
152190

153191
req.setInput(i);
154192
req.setScheduledstarttimestamp(ts);
155193

194+
populateTagsMap(req.getTagsMap(), tags);
195+
156196
this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}`);
157197

158198
const res = await callWithMetadata<pb.CreateInstanceRequest, pb.CreateInstanceResponse>(
@@ -567,14 +607,15 @@ export class TaskHubGrpcClient {
567607
* @example
568608
* ```typescript
569609
* // Iterate over all matching instances
610+
* const logger = new ConsoleLogger();
570611
* const pageable = client.getAllInstances({ statuses: [OrchestrationStatus.COMPLETED] });
571612
* for await (const instance of pageable) {
572-
* console.log(instance.instanceId);
613+
* logger.info(instance.instanceId);
573614
* }
574615
*
575616
* // Iterate over pages
576617
* for await (const page of pageable.asPages()) {
577-
* console.log(`Page has ${page.values.length} items`);
618+
* logger.info(`Page has ${page.values.length} items`);
578619
* }
579620
* ```
580621
*
@@ -735,6 +776,77 @@ export class TaskHubGrpcClient {
735776
return new Page(instanceIds, lastInstanceKey);
736777
}
737778

779+
/**
780+
* Retrieves the history of the specified orchestration instance as a list of HistoryEvent objects.
781+
*
782+
* This method streams the history events from the backend and returns them as an array.
783+
* The history includes all events that occurred during the orchestration execution,
784+
* such as task scheduling, completion, failure, timer events, and more.
785+
*
786+
* If the orchestration instance does not exist, an empty array is returned.
787+
*
788+
* @param instanceId - The unique identifier of the orchestration instance.
789+
* @returns A Promise that resolves to an array of HistoryEvent objects representing
790+
* the orchestration's history. Returns an empty array if the instance is not found.
791+
* @throws {Error} If the instanceId is null or empty.
792+
* @throws {Error} If the operation is canceled.
793+
* @throws {Error} If an internal error occurs while retrieving the history.
794+
*
795+
* @example
796+
* ```typescript
797+
* const history = await client.getOrchestrationHistory(instanceId);
798+
* for (const event of history) {
799+
* console.log(`Event ${event.eventId}: ${event.type} at ${event.timestamp}`);
800+
* }
801+
* ```
802+
*/
803+
async getOrchestrationHistory(instanceId: string): Promise<HistoryEvent[]> {
804+
if (!instanceId) {
805+
throw new Error("instanceId is required");
806+
}
807+
808+
const req = new pb.StreamInstanceHistoryRequest();
809+
req.setInstanceid(instanceId);
810+
req.setForworkitemprocessing(false);
811+
812+
const metadata = this._metadataGenerator ? await this._metadataGenerator() : new grpc.Metadata();
813+
const stream = this._stub.streamInstanceHistory(req, metadata);
814+
815+
return new Promise<HistoryEvent[]>((resolve, reject) => {
816+
const historyEvents: HistoryEvent[] = [];
817+
818+
stream.on("data", (chunk: pb.HistoryChunk) => {
819+
const protoEvents = chunk.getEventsList();
820+
for (const protoEvent of protoEvents) {
821+
const event = convertProtoHistoryEvent(protoEvent);
822+
if (event) {
823+
historyEvents.push(event);
824+
}
825+
}
826+
});
827+
828+
stream.on("end", () => {
829+
stream.removeAllListeners();
830+
resolve(historyEvents);
831+
});
832+
833+
stream.on("error", (err: grpc.ServiceError) => {
834+
stream.removeAllListeners();
835+
// Return empty array for NOT_FOUND to be consistent with DTS behavior
836+
// (DTS returns empty stream for non-existent instances) and user-friendly
837+
if (err.code === grpc.status.NOT_FOUND) {
838+
resolve([]);
839+
} else if (err.code === grpc.status.CANCELLED) {
840+
reject(new Error(`The getOrchestrationHistory operation was canceled.`));
841+
} else if (err.code === grpc.status.INTERNAL) {
842+
reject(new Error(`An error occurred while retrieving the history for orchestration with instanceId '${instanceId}'.`));
843+
} else {
844+
reject(err);
845+
}
846+
});
847+
});
848+
}
849+
738850
/**
739851
* Helper method to create an OrchestrationState from a protobuf OrchestrationState.
740852
*/
@@ -779,6 +891,8 @@ export class TaskHubGrpcClient {
779891
);
780892
}
781893

894+
const tags = mapToRecord(protoState.getTagsMap());
895+
782896
return new OrchestrationState(
783897
instanceId,
784898
name ?? "",
@@ -789,6 +903,7 @@ export class TaskHubGrpcClient {
789903
serializedOutput,
790904
serializedCustomStatus,
791905
failureDetails,
906+
tags,
792907
);
793908
}
794909
}

packages/durabletask-js/src/index.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,44 @@ export { OrchestrationState } from "./orchestration/orchestration-state";
1818
export { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "./orchestration/orchestration-query";
1919
export { Page, AsyncPageable, createAsyncPageable } from "./orchestration/page";
2020

21+
// History event types
22+
export {
23+
HistoryEvent,
24+
HistoryEventType,
25+
HistoryEventBase,
26+
ExecutionStartedEvent,
27+
ExecutionCompletedEvent,
28+
ExecutionTerminatedEvent,
29+
ExecutionSuspendedEvent,
30+
ExecutionResumedEvent,
31+
ExecutionRewoundEvent,
32+
TaskScheduledEvent,
33+
TaskCompletedEvent,
34+
TaskFailedEvent,
35+
SubOrchestrationInstanceCreatedEvent,
36+
SubOrchestrationInstanceCompletedEvent,
37+
SubOrchestrationInstanceFailedEvent,
38+
TimerCreatedEvent,
39+
TimerFiredEvent,
40+
OrchestratorStartedEvent,
41+
OrchestratorCompletedEvent,
42+
EventSentEvent,
43+
EventRaisedEvent,
44+
GenericEvent,
45+
HistoryStateEvent,
46+
ContinueAsNewEvent,
47+
OrchestrationInstance,
48+
ParentInstanceInfo,
49+
TraceContext,
50+
EntityOperationSignaledEvent,
51+
EntityOperationCalledEvent,
52+
EntityOperationCompletedEvent,
53+
EntityOperationFailedEvent,
54+
EntityLockRequestedEvent,
55+
EntityLockGrantedEvent,
56+
EntityUnlockSentEvent,
57+
} from "./orchestration/history-event";
58+
2159
// Proto types (for advanced usage)
2260
export { OrchestrationStatus as ProtoOrchestrationStatus } from "./proto/orchestrator_service_pb";
2361

@@ -27,7 +65,13 @@ export { Task } from "./task/task";
2765

2866
// Retry policies and task options
2967
export { RetryPolicy, RetryPolicyOptions } from "./task/retry";
30-
export { TaskOptions, SubOrchestrationOptions, taskOptionsFromRetryPolicy, subOrchestrationOptionsFromRetryPolicy } from "./task/options";
68+
export {
69+
TaskOptions,
70+
SubOrchestrationOptions,
71+
StartOrchestrationOptions,
72+
taskOptionsFromRetryPolicy,
73+
subOrchestrationOptionsFromRetryPolicy,
74+
} from "./task/options";
3175

3276
// Types
3377
export { TOrchestrator } from "./types/orchestrator.type";

0 commit comments

Comments
 (0)