Skip to content

Commit 80ec068

Browse files
committed
remove remnants of discarded flows
1 parent 0ab64cb commit 80ec068

26 files changed

Lines changed: 9032 additions & 192 deletions

dist/activity/Activity.js

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ Activity.prototype.resume = function resume() {
435435
};
436436

437437
/**
438-
* Discard the activity. Stops execution if running and discards outbound flows.
438+
* Discard the activity. Stops execution if running; the activity leaves without taking any outbound flow.
439439
* @param {Record<string, any>} [discardContent] Optional content propagated with the discard
440440
* @returns {void}
441441
*/
@@ -707,16 +707,11 @@ Activity.prototype._onInbound = function onInbound(routingKey, message) {
707707
message: content.message,
708708
inbound
709709
});
710-
case 'flow.discard':
711710
case 'activity.discard':
712711
{
713-
let discardSequence;
714-
if (content.discardSequence) discardSequence = content.discardSequence.slice();
715-
const context = {
716-
inbound,
717-
discardSequence
718-
};
719-
return this[K_FLAGS].isParallelGateway ? this.run(context) : this._runDiscard(context);
712+
return this._runDiscard({
713+
inbound
714+
});
720715
}
721716
}
722717
};
@@ -768,7 +763,6 @@ Activity.prototype._onInboundEvent = function onInboundEvent(routingKey, message
768763
}
769764
case 'association.take':
770765
case 'flow.take':
771-
case 'flow.discard':
772766
return inboundQ.queueMessage(fields, (0, _messageHelper.cloneContent)(content), properties);
773767
}
774768
};
@@ -972,7 +966,7 @@ Activity.prototype._onExecutionMessage = function onExecutionMessage(routingKey,
972966
switch (routingKey) {
973967
case 'execution.outbound.take':
974968
{
975-
return this._doOutbound(message, false, (err, outbound) => {
969+
return this._doOutbound(message, (err, outbound) => {
976970
message.ack();
977971
if (err) return this.emitFatal(err, content);
978972
broker.publish('run', 'run.execute.passthrough', (0, _messageHelper.cloneContent)(content, {
@@ -1027,13 +1021,13 @@ Activity.prototype._doRunLeave = function doRunLeave(message, isDiscarded, onOut
10271021
properties
10281022
} = message;
10291023
const correlationId = properties.correlationId;
1030-
if (content.ignoreOutbound) {
1024+
if (isDiscarded || content.ignoreOutbound) {
10311025
this.broker.publish('run', 'run.leave', (0, _messageHelper.cloneContent)(content), {
10321026
correlationId
10331027
});
10341028
return onOutbound();
10351029
}
1036-
return this._doOutbound((0, _messageHelper.cloneMessage)(message), isDiscarded, (err, outbound) => {
1030+
return this._doOutbound((0, _messageHelper.cloneMessage)(message), (err, outbound) => {
10371031
if (err) {
10381032
return this._publishEvent('error', {
10391033
...content,
@@ -1054,16 +1048,12 @@ Activity.prototype._doRunLeave = function doRunLeave(message, isDiscarded, onOut
10541048
};
10551049

10561050
/** @internal */
1057-
Activity.prototype._doOutbound = function doOutbound(fromMessage, isDiscarded, callback) {
1051+
Activity.prototype._doOutbound = function doOutbound(fromMessage, callback) {
10581052
const outboundSequenceFlows = this[K_FLOWS].outboundSequenceFlows;
10591053
if (!outboundSequenceFlows.length) return callback(null, []);
10601054
const fromContent = fromMessage.content;
10611055
let outboundFlows;
1062-
if (isDiscarded) {
1063-
outboundFlows = outboundSequenceFlows.map(flow => (0, _outboundEvaluator.formatFlowAction)(flow, {
1064-
action: 'discard'
1065-
}));
1066-
} else if (fromContent.outbound?.length) {
1056+
if (fromContent.outbound?.length) {
10671057
outboundFlows = outboundSequenceFlows.map(flow => (0, _outboundEvaluator.formatFlowAction)(flow, fromContent.outbound.filter(f => f.id === flow.id).pop()));
10681058
}
10691059
if (outboundFlows) {

dist/flows/SequenceFlow.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ SequenceFlow.prototype.take = function take(content) {
8787
};
8888

8989
/**
90-
* Discard the flow and publish flow.discard. Detects loops via discardSequence and emits
91-
* flow.looped instead when the target id is already in the sequence.
90+
* Discard the flow and publish flow.discard.
91+
*
92+
* @deprecated The execution runtime no longer discards sequence flows, so this is a no-op during a run. It will be removed in a future version.
9293
* @param {Record<string, any>} [content]
9394
*/
9495
SequenceFlow.prototype.discard = function discard(content = {}) {

dist/gateways/ParallelGateway.js

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ function ParallelGateway(activityDef, context) {
3737
const cachedPeers = context.getShakenPeers(id);
3838
if (cachedPeers) {
3939
for (const [flowId, sourceIds] of cachedPeers) {
40-
let peer = peers.get(flowId);
41-
if (!peer) peers.set(flowId, peer = new Set());
40+
const peer = peers.get(flowId);
4241
for (const sourceId of sourceIds) peer.add(sourceId);
4342
}
4443
activity[K_PEERS_DISCOVERED] = true;
@@ -162,19 +161,15 @@ ParallelGatewayBehaviour.prototype._onPeerEnterMessage = function onPeerEnterMes
162161
if (peer) this.peerMonitor.running.set(message.content.id, peer);
163162
};
164163
ParallelGatewayBehaviour.prototype._complete = function complete() {
165-
const take = this.peerMonitor.inbound.some(({
166-
action
167-
}) => action === 'take');
168164
this.broker.cancel('_converging-inbound', false);
169165
this._stop();
170-
const state = take ? 'completed' : 'discard';
171-
this.activity.logger.debug(`<${this.executionId} (${this.id})> completed monitoring with state: ${state}`);
166+
this.activity.logger.debug(`<${this.executionId} (${this.id})> completed monitoring`);
172167
const content = (0, _messageHelper.cloneContent)(this[_constants.K_EXECUTE_MESSAGE].content, {
173168
isRootScope: true,
174-
state
169+
state: 'completed'
175170
});
176171
content.inbound = this.peerMonitor.inbound;
177-
return this.broker.publish('execution', `execute.${state}`, content);
172+
return this.broker.publish('execution', 'execute.completed', content);
178173
};
179174
ParallelGatewayBehaviour.prototype._stop = function stop() {
180175
this.broker.cancel('_converging-inbound');

dist/messageHelper.js

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ exports.unshiftParent = unshiftParent;
1717
*/
1818
function cloneContent(content, extend) {
1919
const {
20-
discardSequence,
2120
inbound,
2221
outbound,
2322
parent,
@@ -32,9 +31,6 @@ function cloneContent(content, extend) {
3231
if (parent) {
3332
clone.parent = cloneParent(parent);
3433
}
35-
if (discardSequence) {
36-
clone.discardSequence = discardSequence.slice();
37-
}
3834
if (inbound) {
3935
clone.inbound = inbound.map(c => cloneContent(c));
4036
}

dist/process/ProcessExecution.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,6 @@ ProcessExecution.prototype._onChildMessage = function onChildMessage(routingKey,
712712
if (!prevMsg) return message.ack();
713713
break;
714714
}
715-
case 'flow.looped':
716715
case 'activity.leave':
717716
return this._onChildCompleted(message);
718717
}

src/activity/Activity.js

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ Activity.prototype.resume = function resume() {
428428
};
429429

430430
/**
431-
* Discard the activity. Stops execution if running and discards outbound flows.
431+
* Discard the activity. Stops execution if running; the activity leaves without taking any outbound flow.
432432
* @param {Record<string, any>} [discardContent] Optional content propagated with the discard
433433
* @returns {void}
434434
*/
@@ -677,12 +677,8 @@ Activity.prototype._onInbound = function onInbound(routingKey, message) {
677677
message: content.message,
678678
inbound,
679679
});
680-
case 'flow.discard':
681680
case 'activity.discard': {
682-
let discardSequence;
683-
if (content.discardSequence) discardSequence = content.discardSequence.slice();
684-
const context = { inbound, discardSequence };
685-
return this[K_FLAGS].isParallelGateway ? this.run(context) : this._runDiscard(context);
681+
return this._runDiscard({ inbound });
686682
}
687683
}
688684
};
@@ -721,7 +717,6 @@ Activity.prototype._onInboundEvent = function onInboundEvent(routingKey, message
721717
}
722718
case 'association.take':
723719
case 'flow.take':
724-
case 'flow.discard':
725720
return inboundQ.queueMessage(fields, cloneContent(content), properties);
726721
}
727722
};
@@ -918,7 +913,7 @@ Activity.prototype._onExecutionMessage = function onExecutionMessage(routingKey,
918913

919914
switch (routingKey) {
920915
case 'execution.outbound.take': {
921-
return this._doOutbound(message, false, (err, outbound) => {
916+
return this._doOutbound(message, (err, outbound) => {
922917
message.ack();
923918
if (err) return this.emitFatal(err, content);
924919
broker.publish('run', 'run.execute.passthrough', cloneContent(content, { outbound }));
@@ -958,12 +953,12 @@ Activity.prototype._ackRunExecuteMessage = function ackRunExecuteMessage() {
958953
Activity.prototype._doRunLeave = function doRunLeave(message, isDiscarded, onOutbound) {
959954
const { content, properties } = message;
960955
const correlationId = properties.correlationId;
961-
if (content.ignoreOutbound) {
956+
if (isDiscarded || content.ignoreOutbound) {
962957
this.broker.publish('run', 'run.leave', cloneContent(content), { correlationId });
963958
return onOutbound();
964959
}
965960

966-
return this._doOutbound(cloneMessage(message), isDiscarded, (err, outbound) => {
961+
return this._doOutbound(cloneMessage(message), (err, outbound) => {
967962
if (err) {
968963
return this._publishEvent('error', { ...content, error: err }, { correlationId });
969964
}
@@ -982,16 +977,14 @@ Activity.prototype._doRunLeave = function doRunLeave(message, isDiscarded, onOut
982977
};
983978

984979
/** @internal */
985-
Activity.prototype._doOutbound = function doOutbound(fromMessage, isDiscarded, callback) {
980+
Activity.prototype._doOutbound = function doOutbound(fromMessage, callback) {
986981
const outboundSequenceFlows = this[K_FLOWS].outboundSequenceFlows;
987982
if (!outboundSequenceFlows.length) return callback(null, []);
988983

989984
const fromContent = fromMessage.content;
990985

991986
let outboundFlows;
992-
if (isDiscarded) {
993-
outboundFlows = outboundSequenceFlows.map((flow) => formatFlowAction(flow, { action: 'discard' }));
994-
} else if (fromContent.outbound?.length) {
987+
if (fromContent.outbound?.length) {
995988
outboundFlows = outboundSequenceFlows.map((flow) => formatFlowAction(flow, fromContent.outbound.filter((f) => f.id === flow.id).pop()));
996989
}
997990

src/flows/SequenceFlow.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@ SequenceFlow.prototype.take = function take(content) {
6868
};
6969

7070
/**
71-
* Discard the flow and publish flow.discard. Detects loops via discardSequence and emits
72-
* flow.looped instead when the target id is already in the sequence.
71+
* Discard the flow and publish flow.discard.
72+
*
73+
* @deprecated The execution runtime no longer discards sequence flows, so this is a no-op during a run. It will be removed in a future version.
7374
* @param {Record<string, any>} [content]
7475
*/
7576
SequenceFlow.prototype.discard = function discard(content = {}) {

src/gateways/ParallelGateway.js

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,20 +177,16 @@ ParallelGatewayBehaviour.prototype._onPeerEnterMessage = function onPeerEnterMes
177177
};
178178

179179
ParallelGatewayBehaviour.prototype._complete = function complete() {
180-
const take = this.peerMonitor.inbound.some(({ action }) => action === 'take');
181-
182180
this.broker.cancel('_converging-inbound', false);
183181

184182
this._stop();
185183

186-
const state = take ? 'completed' : 'discard';
187-
188-
this.activity.logger.debug(`<${this.executionId} (${this.id})> completed monitoring with state: ${state}`);
184+
this.activity.logger.debug(`<${this.executionId} (${this.id})> completed monitoring`);
189185

190-
const content = cloneContent(this[K_EXECUTE_MESSAGE].content, { isRootScope: true, state });
186+
const content = cloneContent(this[K_EXECUTE_MESSAGE].content, { isRootScope: true, state: 'completed' });
191187
content.inbound = this.peerMonitor.inbound;
192188

193-
return this.broker.publish('execution', `execute.${state}`, content);
189+
return this.broker.publish('execution', 'execute.completed', content);
194190
};
195191

196192
ParallelGatewayBehaviour.prototype._stop = function stop() {

src/messageHelper.js

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* @returns cloned content
66
*/
77
export function cloneContent(content, extend) {
8-
const { discardSequence, inbound, outbound, parent, sequence } = content;
8+
const { inbound, outbound, parent, sequence } = content;
99

1010
/** @type {import('#types').ElementMessageContent} */
1111
const clone = {
@@ -16,9 +16,6 @@ export function cloneContent(content, extend) {
1616
if (parent) {
1717
clone.parent = cloneParent(parent);
1818
}
19-
if (discardSequence) {
20-
clone.discardSequence = discardSequence.slice();
21-
}
2219
if (inbound) {
2320
clone.inbound = inbound.map((c) => cloneContent(c));
2421
}

src/process/ProcessExecution.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,6 @@ ProcessExecution.prototype._onChildMessage = function onChildMessage(routingKey,
710710
if (!prevMsg) return message.ack();
711711
break;
712712
}
713-
case 'flow.looped':
714713
case 'activity.leave':
715714
return this._onChildCompleted(message);
716715
}

0 commit comments

Comments
 (0)