Skip to content

Commit 5ddd37d

Browse files
committed
implement getinstances
remove real dts test
1 parent cb2d5ac commit 5ddd37d

File tree

6 files changed

+1349
-1
lines changed

6 files changed

+1349
-1
lines changed

packages/durabletask-js/src/client/client.ts

Lines changed: 241 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@ import { randomUUID } from "crypto";
1313
import { newOrchestrationState } from "../orchestration";
1414
import { OrchestrationState } from "../orchestration/orchestration-state";
1515
import { GrpcClient } from "./client-grpc";
16-
import { OrchestrationStatus, toProtobuf } from "../orchestration/enum/orchestration-status.enum";
16+
import { OrchestrationStatus, toProtobuf, fromProtobuf } from "../orchestration/enum/orchestration-status.enum";
1717
import { TimeoutError } from "../exception/timeout-error";
1818
import { PurgeResult } from "../orchestration/orchestration-purge-result";
1919
import { PurgeInstanceCriteria } from "../orchestration/orchestration-purge-criteria";
2020
import { callWithMetadata, MetadataGenerator } from "../utils/grpc-helper.util";
21+
import { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "../orchestration/orchestration-query";
22+
import { Page, AsyncPageable, createAsyncPageable } from "../orchestration/page";
2123

2224
// Re-export MetadataGenerator for backward compatibility
2325
export { MetadataGenerator } from "../utils/grpc-helper.util";
@@ -384,4 +386,242 @@ export class TaskHubGrpcClient {
384386
}
385387
return new PurgeResult(res.getDeletedinstancecount());
386388
}
389+
390+
/**
391+
* Queries orchestration instances and returns an async iterable of results.
392+
*
393+
* This method supports querying orchestration instances by various filter criteria including
394+
* creation time range, runtime status, instance ID prefix, and task hub names.
395+
*
396+
* The results are returned as an AsyncPageable that supports both iteration over individual
397+
* items and iteration over pages.
398+
*
399+
* @example
400+
* ```typescript
401+
* // Iterate over all matching instances
402+
* const pageable = client.getAllInstances({ statuses: [OrchestrationStatus.COMPLETED] });
403+
* for await (const instance of pageable) {
404+
* console.log(instance.instanceId);
405+
* }
406+
*
407+
* // Iterate over pages
408+
* for await (const page of pageable.asPages()) {
409+
* console.log(`Page has ${page.values.length} items`);
410+
* }
411+
* ```
412+
*
413+
* @param filter - Optional filter criteria for the query.
414+
* @returns An AsyncPageable of OrchestrationState objects.
415+
*/
416+
getAllInstances(filter?: OrchestrationQuery): AsyncPageable<OrchestrationState> {
417+
return createAsyncPageable<OrchestrationState>(
418+
async (continuationToken?: string, pageSize?: number): Promise<Page<OrchestrationState>> => {
419+
const req = new pb.QueryInstancesRequest();
420+
const query = new pb.InstanceQuery();
421+
422+
// Set created time range
423+
if (filter?.createdFrom) {
424+
const timestamp = new Timestamp();
425+
timestamp.fromDate(filter.createdFrom);
426+
query.setCreatedtimefrom(timestamp);
427+
}
428+
429+
if (filter?.createdTo) {
430+
const timestamp = new Timestamp();
431+
timestamp.fromDate(filter.createdTo);
432+
query.setCreatedtimeto(timestamp);
433+
}
434+
435+
// Set runtime statuses
436+
if (filter?.statuses) {
437+
for (const status of filter.statuses) {
438+
query.addRuntimestatus(toProtobuf(status));
439+
}
440+
}
441+
442+
// Set task hub names
443+
if (filter?.taskHubNames) {
444+
for (const name of filter.taskHubNames) {
445+
const stringValue = new StringValue();
446+
stringValue.setValue(name);
447+
query.addTaskhubnames(stringValue);
448+
}
449+
}
450+
451+
// Set instance ID prefix
452+
if (filter?.instanceIdPrefix) {
453+
const prefixValue = new StringValue();
454+
prefixValue.setValue(filter.instanceIdPrefix);
455+
query.setInstanceidprefix(prefixValue);
456+
}
457+
458+
// Set page size
459+
const effectivePageSize = pageSize ?? filter?.pageSize ?? DEFAULT_PAGE_SIZE;
460+
query.setMaxinstancecount(effectivePageSize);
461+
462+
// Set continuation token (use provided or from filter)
463+
const token = continuationToken ?? filter?.continuationToken;
464+
if (token) {
465+
const tokenValue = new StringValue();
466+
tokenValue.setValue(token);
467+
query.setContinuationtoken(tokenValue);
468+
}
469+
470+
// Set fetch inputs and outputs
471+
query.setFetchinputsandoutputs(filter?.fetchInputsAndOutputs ?? false);
472+
473+
req.setQuery(query);
474+
475+
const response = await callWithMetadata<pb.QueryInstancesRequest, pb.QueryInstancesResponse>(
476+
this._stub.queryInstances.bind(this._stub),
477+
req,
478+
this._metadataGenerator,
479+
);
480+
481+
const states: OrchestrationState[] = [];
482+
const orchestrationStateList = response.getOrchestrationstateList();
483+
for (const state of orchestrationStateList) {
484+
const orchestrationState = this._createOrchestrationStateFromProto(state, filter?.fetchInputsAndOutputs ?? false);
485+
if (orchestrationState) {
486+
states.push(orchestrationState);
487+
}
488+
}
489+
490+
const responseContinuationToken = response.getContinuationtoken()?.getValue();
491+
return new Page(states, responseContinuationToken);
492+
},
493+
);
494+
}
495+
496+
/**
497+
* Lists orchestration instance IDs that match the specified runtime status
498+
* and completed time range, using key-based pagination.
499+
*
500+
* This method is optimized for listing instance IDs without fetching full instance metadata,
501+
* making it more efficient when only instance IDs are needed.
502+
*
503+
* @example
504+
* ```typescript
505+
* // Get first page of completed instances
506+
* const page = await client.listInstanceIds({
507+
* runtimeStatus: [OrchestrationStatus.COMPLETED],
508+
* pageSize: 50
509+
* });
510+
*
511+
* // Get next page using the continuation key
512+
* if (page.hasMoreResults) {
513+
* const nextPage = await client.listInstanceIds({
514+
* runtimeStatus: [OrchestrationStatus.COMPLETED],
515+
* pageSize: 50,
516+
* lastInstanceKey: page.continuationToken
517+
* });
518+
* }
519+
* ```
520+
*
521+
* @param options - Optional filter criteria and pagination options.
522+
* @returns A Promise that resolves to a Page of instance IDs.
523+
*/
524+
async listInstanceIds(options?: ListInstanceIdsOptions): Promise<Page<string>> {
525+
const req = new pb.ListInstanceIdsRequest();
526+
527+
// Set page size
528+
const pageSize = options?.pageSize ?? DEFAULT_PAGE_SIZE;
529+
req.setPagesize(pageSize);
530+
531+
// Set last instance key (continuation token)
532+
if (options?.lastInstanceKey) {
533+
const keyValue = new StringValue();
534+
keyValue.setValue(options.lastInstanceKey);
535+
req.setLastinstancekey(keyValue);
536+
}
537+
538+
// Set runtime statuses
539+
if (options?.runtimeStatus) {
540+
for (const status of options.runtimeStatus) {
541+
req.addRuntimestatus(toProtobuf(status));
542+
}
543+
}
544+
545+
// Set completed time range
546+
if (options?.completedTimeFrom) {
547+
const timestamp = new Timestamp();
548+
timestamp.fromDate(options.completedTimeFrom);
549+
req.setCompletedtimefrom(timestamp);
550+
}
551+
552+
if (options?.completedTimeTo) {
553+
const timestamp = new Timestamp();
554+
timestamp.fromDate(options.completedTimeTo);
555+
req.setCompletedtimeto(timestamp);
556+
}
557+
558+
const response = await callWithMetadata<pb.ListInstanceIdsRequest, pb.ListInstanceIdsResponse>(
559+
this._stub.listInstanceIds.bind(this._stub),
560+
req,
561+
this._metadataGenerator,
562+
);
563+
564+
const instanceIds = response.getInstanceidsList();
565+
const lastInstanceKey = response.getLastinstancekey()?.getValue();
566+
567+
return new Page(instanceIds, lastInstanceKey);
568+
}
569+
570+
/**
571+
* Helper method to create an OrchestrationState from a protobuf OrchestrationState.
572+
*/
573+
private _createOrchestrationStateFromProto(
574+
protoState: pb.OrchestrationState,
575+
fetchPayloads: boolean,
576+
): OrchestrationState | undefined {
577+
const instanceId = protoState.getInstanceid();
578+
const name = protoState.getName();
579+
const runtimeStatus = protoState.getOrchestrationstatus();
580+
const createdTimestamp = protoState.getCreatedtimestamp();
581+
const lastUpdatedTimestamp = protoState.getLastupdatedtimestamp();
582+
583+
if (!instanceId) {
584+
return undefined;
585+
}
586+
587+
const createdAt = createdTimestamp ? new Date(createdTimestamp.toDate()) : new Date(0);
588+
const lastUpdatedAt = lastUpdatedTimestamp ? new Date(lastUpdatedTimestamp.toDate()) : new Date(0);
589+
590+
// Map proto status to our status enum using the existing conversion function
591+
const status = fromProtobuf(runtimeStatus);
592+
593+
let serializedInput: string | undefined;
594+
let serializedOutput: string | undefined;
595+
let serializedCustomStatus: string | undefined;
596+
597+
if (fetchPayloads) {
598+
serializedInput = protoState.getInput()?.getValue();
599+
serializedOutput = protoState.getOutput()?.getValue();
600+
serializedCustomStatus = protoState.getCustomstatus()?.getValue();
601+
}
602+
603+
// Extract failure details if present
604+
let failureDetails;
605+
const protoFailureDetails = protoState.getFailuredetails();
606+
if (protoFailureDetails) {
607+
const { FailureDetails } = require("../task/failure-details");
608+
failureDetails = new FailureDetails(
609+
protoFailureDetails.getErrortype(),
610+
protoFailureDetails.getErrormessage(),
611+
protoFailureDetails.getStacktrace()?.getValue(),
612+
);
613+
}
614+
615+
return new OrchestrationState(
616+
instanceId,
617+
name,
618+
status,
619+
createdAt,
620+
lastUpdatedAt,
621+
serializedInput,
622+
serializedOutput,
623+
serializedCustomStatus,
624+
failureDetails,
625+
);
626+
}
387627
}

packages/durabletask-js/src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ export { ActivityContext } from "./task/context/activity-context";
1212
// Orchestration types and utilities
1313
export { PurgeInstanceCriteria } from "./orchestration/orchestration-purge-criteria";
1414
export { OrchestrationStatus } from "./orchestration/enum/orchestration-status.enum";
15+
export { OrchestrationState } from "./orchestration/orchestration-state";
16+
17+
// Query types
18+
export { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "./orchestration/orchestration-query";
19+
export { Page, AsyncPageable, createAsyncPageable } from "./orchestration/page";
1520

1621
// Proto types (for advanced usage)
1722
export { OrchestrationStatus as ProtoOrchestrationStatus } from "./proto/orchestrator_service_pb";
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
import { OrchestrationStatus } from "./enum/orchestration-status.enum";
5+
6+
/**
7+
* Default page size when not supplied.
8+
*/
9+
export const DEFAULT_PAGE_SIZE = 100;
10+
11+
/**
12+
* A filter for querying orchestration instances.
13+
*/
14+
export interface OrchestrationQuery {
15+
/**
16+
* Filter by orchestrations created on or after this date.
17+
*/
18+
createdFrom?: Date;
19+
20+
/**
21+
* Filter by orchestrations created on or before this date.
22+
*/
23+
createdTo?: Date;
24+
25+
/**
26+
* Filter by orchestration runtime statuses.
27+
*/
28+
statuses?: OrchestrationStatus[];
29+
30+
/**
31+
* Names of task hubs to query across.
32+
*/
33+
taskHubNames?: string[];
34+
35+
/**
36+
* Prefix of instance IDs to include.
37+
*/
38+
instanceIdPrefix?: string;
39+
40+
/**
41+
* Maximum number of items to include per page.
42+
* @default 100
43+
*/
44+
pageSize?: number;
45+
46+
/**
47+
* Whether to include instance inputs and outputs in the query results.
48+
* @default false
49+
*/
50+
fetchInputsAndOutputs?: boolean;
51+
52+
/**
53+
* The continuation token to continue a paged query.
54+
*/
55+
continuationToken?: string;
56+
}
57+
58+
/**
59+
* Options for listing instance IDs with key-based pagination.
60+
*/
61+
export interface ListInstanceIdsOptions {
62+
/**
63+
* Optional set of runtime statuses to filter by. If undefined, all statuses are included.
64+
*/
65+
runtimeStatus?: OrchestrationStatus[];
66+
67+
/**
68+
* Inclusive lower bound of the orchestration completed time filter.
69+
*/
70+
completedTimeFrom?: Date;
71+
72+
/**
73+
* Inclusive upper bound of the orchestration completed time filter.
74+
*/
75+
completedTimeTo?: Date;
76+
77+
/**
78+
* Maximum number of instance IDs to return in a single page.
79+
* @default 100
80+
*/
81+
pageSize?: number;
82+
83+
/**
84+
* Continuation key from the previous page. If undefined, listing starts from the beginning.
85+
*/
86+
lastInstanceKey?: string;
87+
}

0 commit comments

Comments
 (0)