Skip to content

Commit 85fc923

Browse files
fix(bpmn-service): add new worker key to handle OptimisticLockingException error (#2245)
* fix(bpmn-service): add new worker key to handle OptimisticLockingException error * refactor(bpmn-service): rename inProgress to isInProgress * test(bpmn-service): add test cases for worker implementation provider * refactor(bpmn-service): remove multiple expect from test case --------- Co-authored-by: Namandeep Singh <namandeep.singh>
1 parent 70a4c53 commit 85fc923

6 files changed

Lines changed: 183 additions & 8 deletions

File tree

services/bpmn-service/src/__tests__/unit/register-worker.unit.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
//
33
// This software is released under the MIT License.
44
// https://opensource.org/licenses/MIT
5-
import {WorkerRegisterFnProvider} from '../../providers/register-worker.service';
5+
import {AnyObject} from '@loopback/repository';
66
import {expect} from '@loopback/testlab';
7+
import {WorkerRegisterFnProvider} from '../../providers/register-worker.service';
78
import {BPMTask, WorkerMap} from '../../types';
8-
import {AnyObject} from '@loopback/repository';
99

1010
describe('RegisterWorker unit', () => {
1111
let map: WorkerMap = {};
@@ -34,23 +34,27 @@ describe('RegisterWorker unit', () => {
3434
command: task1,
3535
running: false,
3636
topic: 'topic1',
37+
isInProgress: false,
3738
},
3839
{
3940
command: task2,
4041
running: false,
4142
topic: 'topic2',
43+
isInProgress: false,
4244
},
4345
],
4446
workflow2: [
4547
{
4648
command: task3,
4749
running: false,
4850
topic: 'topic3',
51+
isInProgress: false,
4952
},
5053
{
5154
command: task4,
5255
running: false,
5356
topic: 'topic4',
57+
isInProgress: false,
5458
},
5559
],
5660
};
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
// Copyright (c) 2023 Sourcefuse Technologies
2+
//
3+
// This software is released under the MIT License.
4+
// https://opensource.org/licenses/MIT
5+
import {AnyObject} from '@loopback/repository';
6+
import {expect} from '@loopback/testlab';
7+
import {ICommand, ILogger} from '@sourceloop/core';
8+
import {Client} from 'camunda-external-task-client-js';
9+
import sinon from 'sinon';
10+
import {WorkerImplementationProvider} from '../../providers/worker-implementation.provider';
11+
import {BPMTask, IWorkflowServiceConfig, WorkerNameCmdPair} from '../../types';
12+
13+
describe('WorkerImplementationProvider Unit Tests', () => {
14+
let worker: WorkerNameCmdPair;
15+
let provider: WorkerImplementationProvider;
16+
let mockClient: sinon.SinonStubbedInstance<Client>;
17+
let mockLogger: sinon.SinonStubbedInstance<ILogger>;
18+
let clock: sinon.SinonFakeTimers;
19+
20+
beforeEach(() => {
21+
// Mock Logger
22+
mockLogger = {
23+
info: sinon.stub(),
24+
error: sinon.stub(),
25+
warn: sinon.stub(),
26+
debug: sinon.stub(),
27+
} as unknown as sinon.SinonStubbedInstance<ILogger>;
28+
29+
// Mock Workflow Service Config
30+
const config: IWorkflowServiceConfig = {
31+
useCustomSequence: true,
32+
workflowEngineBaseUrl: 'http://localhost:8080',
33+
};
34+
35+
// Mock Camunda Client
36+
mockClient = sinon.createStubInstance(Client);
37+
sinon.stub(Client.prototype, 'subscribe').callsFake((_topic, callback) => {
38+
callback({
39+
task: {
40+
variables: {
41+
get: sinon.stub(),
42+
getTyped: sinon.stub(),
43+
getAll: sinon.stub(),
44+
getAllTyped: sinon.stub(),
45+
set: sinon.stub(),
46+
setTyped: sinon.stub(),
47+
setAll: sinon.stub(),
48+
setAllTyped: sinon.stub(),
49+
},
50+
},
51+
taskService: {
52+
complete: sinon.stub(),
53+
handleFailure: sinon.stub(),
54+
handleBpmnError: sinon.stub(),
55+
extendLock: sinon.stub(),
56+
unlock: sinon.stub(),
57+
},
58+
});
59+
return {unsubscribe: sinon.stub()};
60+
});
61+
62+
// Use fake timers to control setTimeout/setInterval
63+
clock = sinon.useFakeTimers();
64+
65+
provider = new WorkerImplementationProvider(config, mockLogger);
66+
67+
const mockCommand: ICommand = {
68+
parameters: {},
69+
execute: sinon.stub().resolves({}), // Mock execute() to return a resolved promise
70+
};
71+
72+
// Initialize Worker
73+
worker = {
74+
topic: 'test-topic',
75+
command: new BPMTask<AnyObject, AnyObject>(mockCommand),
76+
running: false,
77+
isInProgress: false,
78+
};
79+
});
80+
81+
afterEach(() => {
82+
clock.restore();
83+
sinon.restore();
84+
});
85+
86+
it('should initialize worker with isInProgress as false', async () => {
87+
const implementationFn = provider.value();
88+
await implementationFn(worker);
89+
expect(worker.isInProgress).to.be.false();
90+
});
91+
92+
it('should set isInProgress to true when processing starts', async () => {
93+
const implementationFn = provider.value();
94+
95+
// Modify existing stub instead of re-stubbing
96+
(worker.command.command.execute as sinon.SinonStub).callsFake(async () => {
97+
// verify isInProgress is true during worker execution flow
98+
expect(worker.isInProgress).to.be.true();
99+
return {};
100+
});
101+
102+
await implementationFn(worker);
103+
});
104+
105+
it('should return early if worker is already in progress', async () => {
106+
worker.isInProgress = true;
107+
provider.client = mockClient as unknown as Client;
108+
109+
const implementationFn = provider.value();
110+
await implementationFn(worker);
111+
112+
const operationStub = sinon.stub(worker.command, 'operation');
113+
await implementationFn(worker);
114+
115+
expect(operationStub.called).to.be.false();
116+
});
117+
118+
it('should set isInProgress to false when task completes', async () => {
119+
const implementationFn = provider.value();
120+
await implementationFn(worker);
121+
122+
// Properly stub the 'operation' method
123+
const operationStub = sinon.stub(worker.command, 'operation');
124+
125+
// Simulate task completion by invoking the callback with an empty object
126+
operationStub.callsFake((_data, done) => {
127+
if (done) done({});
128+
});
129+
130+
expect(worker.isInProgress).to.be.false();
131+
132+
// Restore the stub after the test
133+
operationStub.restore();
134+
});
135+
136+
it('should set isInProgress to false if an error occurs during polling', async () => {
137+
sinon.restore();
138+
const onStub = sinon
139+
.stub(mockClient, 'on')
140+
.callsArgWith(1, new Error('Test Poll Error'));
141+
142+
const implementationFn = provider.value();
143+
await implementationFn(worker);
144+
145+
// Simulate poll error
146+
/* eslint-disable @typescript-eslint/no-explicit-any */
147+
(provider.client as any).emit(
148+
'poll:error',
149+
new Error('Simulated Polling Error'),
150+
);
151+
152+
expect(worker.isInProgress).to.be.false();
153+
expect(worker.running).to.be.false();
154+
155+
onStub.restore();
156+
});
157+
});

services/bpmn-service/src/providers/register-worker.service.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,16 @@ export class WorkerRegisterFnProvider implements Provider<WorkerRegisterFn> {
2121
workerMap = {};
2222
}
2323
if (workerMap[workflowName]) {
24-
workerMap[workflowName].push({topic, command, running: false});
24+
workerMap[workflowName].push({
25+
topic,
26+
command,
27+
running: false,
28+
isInProgress: false,
29+
});
2530
} else {
26-
workerMap[workflowName] = [{topic, command, running: false}];
31+
workerMap[workflowName] = [
32+
{topic, command, running: false, isInProgress: false},
33+
];
2734
this.workerMapSetter(workerMap);
2835
}
2936
};

services/bpmn-service/src/providers/worker-implementation.provider.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
// This software is released under the MIT License.
44
// https://opensource.org/licenses/MIT
55
import {inject, Provider} from '@loopback/context';
6-
import {IWorkflowServiceConfig, WorkerImplementationFn} from '../types';
7-
import {Client, logger} from 'camunda-external-task-client-js';
8-
import {WorkflowServiceBindings} from '../keys';
96
import {AnyObject} from '@loopback/repository';
107
import {ILogger, LOGGER} from '@sourceloop/core';
8+
import {Client, logger} from 'camunda-external-task-client-js';
9+
import {WorkflowServiceBindings} from '../keys';
10+
import {IWorkflowServiceConfig, WorkerImplementationFn} from '../types';
1111

1212
export class WorkerImplementationProvider
1313
implements Provider<WorkerImplementationFn>
@@ -31,22 +31,27 @@ export class WorkerImplementationProvider
3131
value(): WorkerImplementationFn {
3232
return async worker => {
3333
if (this.client) {
34+
worker.isInProgress = false;
3435
worker.running = true;
3536
const subscription = this.client.subscribe(
3637
worker.topic,
3738
({task, taskService}) => {
39+
if (worker.isInProgress) return;
40+
worker.isInProgress = true;
3841
worker.command.operation(
3942
{task, taskService},
4043
(result: AnyObject) => {
4144
if (result) {
4245
this.ilogger.info(`Worker task completed - ${worker.topic}`);
4346
}
47+
worker.isInProgress = false;
4448
},
4549
);
4650
},
4751
);
4852
this.client.on('poll:error', () => {
4953
worker.running = false;
54+
worker.isInProgress = false;
5055
subscription.unsubscribe();
5156
});
5257
} else {

services/bpmn-service/src/types/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
// https://opensource.org/licenses/MIT
55
import {AnyObject} from '@loopback/repository';
66
import {IServiceConfig} from '@sourceloop/core';
7+
import {Workflow, WorkflowDto, WorkflowVersion} from '../models';
78
import {BPMTask} from './bpm-task';
8-
import {Workflow, WorkflowVersion, WorkflowDto} from '../models';
99

1010
/* eslint-disable @typescript-eslint/no-explicit-any */
1111
export interface ICommand {
@@ -57,6 +57,7 @@ export type WorkerNameCmdPair<T = AnyObject, R = AnyObject> = {
5757
topic: string;
5858
command: BPMTask<T, R>;
5959
running: boolean;
60+
isInProgress: boolean;
6061
};
6162

6263
export type WorkerImplementationFn<T = AnyObject, R = AnyObject> = (

services/task-service/src/lifecycle-observers/command.observer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ export class CommandObserver implements LifeCycleObserver {
3434
topic: command.topic,
3535
command: new BPMTask(command),
3636
running: false,
37+
isInProgress: false,
3738
});
3839
}
3940
this.logger.debug('Commands registered');

0 commit comments

Comments
 (0)