Skip to content

Commit 888e0f3

Browse files
committed
Ensure concurrency is a valid number
1 parent e925149 commit 888e0f3

3 files changed

Lines changed: 84 additions & 7 deletions

File tree

src/sdk/clients/worker/Poller.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ export class Poller<T> {
3333
this.performWorkFunction = performWorkFunction;
3434
this.options = { ...this.options, ...pollerOptions };
3535
this.logger = logger || noopLogger;
36+
37+
// Ensure concurrency is a valid number
38+
if (typeof this.options.concurrency !== 'number' || isNaN(this.options.concurrency) || this.options.concurrency < 1) {
39+
this.logger.info(
40+
`Invalid concurrency value (${this.options.concurrency}) for poller ${pollerId}. Using default: ${DEFAULT_CONCURRENCY}`
41+
);
42+
this.options.concurrency = DEFAULT_CONCURRENCY;
43+
}
3644
}
3745

3846
get isPolling() {
@@ -77,12 +85,10 @@ export class Poller<T> {
7785
while (this.isPolling) {
7886
try {
7987
// Concurrency could have been updated. Accounting for that
80-
const count = Math.max(
81-
0,
82-
this.options.concurrency - this._tasksInProcess
83-
);
88+
const rawCount = (this.options.concurrency ?? DEFAULT_CONCURRENCY) - this._tasksInProcess;
89+
const count = Math.max(0, Number.isFinite(rawCount) ? rawCount : DEFAULT_CONCURRENCY);
8490

85-
if (count === 0) {
91+
if (count === 0 || !Number.isFinite(count)) {
8692
this.logger.debug(
8793
"Max in process reached, Will skip polling for " + this._pollerId
8894
);

src/sdk/clients/worker/TaskRunner.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ export class TaskRunner {
7373
this.batchPoll,
7474
this.executeTask,
7575
{
76-
concurrency: worker.concurrency ?? options.concurrency,
77-
pollInterval: worker.pollInterval ?? options.pollInterval,
76+
concurrency: worker.concurrency ?? this.options.concurrency,
77+
pollInterval: worker.pollInterval ?? this.options.pollInterval,
7878
},
7979
this.logger
8080
);

src/sdk/clients/worker/__tests__/TaskRunner.test.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1087,4 +1087,75 @@ describe("Polling state", () => {
10871087

10881088
expect(runner.isPolling).toBe(false);
10891089
});
1090+
1091+
test("handles undefined concurrency by using default", async () => {
1092+
const mockClient = createMockClient();
1093+
1094+
// Create worker with undefined concurrency
1095+
const args: RunnerArgs = {
1096+
worker: {
1097+
taskDefName: "test-undefined-concurrency",
1098+
execute: async () => ({
1099+
status: "COMPLETED",
1100+
outputData: { result: "ok" },
1101+
}),
1102+
// concurrency is intentionally undefined
1103+
},
1104+
options: {
1105+
pollInterval: 10,
1106+
workerID: "test-worker",
1107+
// concurrency is also undefined here
1108+
},
1109+
logger: mockLogger,
1110+
client: mockClient,
1111+
};
1112+
1113+
const mockTask: Task = {
1114+
taskId: "task-1",
1115+
workflowInstanceId: "workflow-1",
1116+
status: "IN_PROGRESS",
1117+
taskType: "test-undefined-concurrency",
1118+
inputData: {},
1119+
} as Task;
1120+
1121+
(TaskResource.batchPoll as jest.MockedFunction<typeof TaskResource.batchPoll>)
1122+
.mockResolvedValueOnce({
1123+
data: [mockTask],
1124+
request: {} as Request,
1125+
response: {} as Response,
1126+
} as Awaited<ReturnType<typeof TaskResource.batchPoll>>)
1127+
.mockResolvedValue({
1128+
data: [],
1129+
request: {} as Request,
1130+
response: {} as Response,
1131+
} as Awaited<ReturnType<typeof TaskResource.batchPoll>>);
1132+
1133+
(TaskResource.updateTask as jest.MockedFunction<typeof TaskResource.updateTask>)
1134+
.mockResolvedValue({
1135+
data: null,
1136+
request: {} as Request,
1137+
response: {} as Response,
1138+
} as Awaited<ReturnType<typeof TaskResource.updateTask>>);
1139+
1140+
const runner = new TaskRunner(args);
1141+
activeRunners.push(runner);
1142+
1143+
runner.startPolling();
1144+
1145+
// Wait for polling cycle
1146+
await new Promise(resolve => setTimeout(resolve, 50));
1147+
1148+
await runner.stopPolling();
1149+
1150+
// Verify batchPoll was called with a valid count (should be 1, the default concurrency)
1151+
expect(TaskResource.batchPoll).toHaveBeenCalled();
1152+
const batchPollCall = (TaskResource.batchPoll as jest.MockedFunction<typeof TaskResource.batchPoll>).mock.calls[0];
1153+
const queryParams = batchPollCall?.[0]?.query;
1154+
1155+
// Verify count is a valid number, not NaN
1156+
expect(queryParams?.count).toBeDefined();
1157+
expect(typeof queryParams?.count).toBe('number');
1158+
expect(Number.isFinite(queryParams?.count)).toBe(true);
1159+
expect(queryParams?.count).toBeGreaterThan(0);
1160+
});
10901161
});

0 commit comments

Comments
 (0)