@@ -34,25 +34,21 @@ type sagaInvocation struct {
3434 startedByRPCID string
3535}
3636
37- func (si * sagaInvocation ) setCorrelationIDs (message * gbus.BusMessage , isEvent bool , targetService string ) {
37+ func (si * sagaInvocation ) setCorrelationIDs (message * gbus.BusMessage , targetService string , semantics gbus. Semantics ) {
3838
39- message .CorrelationID = si .inboundMsg .ID
4039 message .SagaID = si .sagaID
4140
42- if ! isEvent {
43- //support saga-to-saga communication
44- if si .inboundMsg .SagaID != "" {
45- message . SagaCorrelationID = si . inboundMsg . SagaID
46- }
41+ if semantics == gbus . REPLY {
42+ message . CorrelationID = si . inboundMsg . ID
43+ message . SagaCorrelationID = si .inboundMsg .SagaID
44+
45+ } else if semantics == gbus . CMD {
4746 //if the saga is potentially invoking itself then set the SagaCorrelationID to reflect that
4847 //https://github.com/wework/grabbit/issues/64
49-
5048 if targetService == si .hostingSvc {
5149 message .SagaCorrelationID = message .SagaID
5250 }
53-
5451 }
55-
5652}
5753func (si * sagaInvocation ) HostingSvc () string {
5854 return si .hostingSvc
@@ -64,13 +60,13 @@ func (si *sagaInvocation) InvokingSvc() string {
6460
6561func (si * sagaInvocation ) Reply (ctx context.Context , message * gbus.BusMessage ) error {
6662 _ , targetService := si .decoratedInvocation .Routing ()
67- si .setCorrelationIDs (message , false , targetService )
63+ si .setCorrelationIDs (message , targetService , gbus . REPLY )
6864 return si .decoratedInvocation .Reply (ctx , message )
6965}
7066
7167func (si * sagaInvocation ) ReplyToInitiator (ctx context.Context , message * gbus.BusMessage ) error {
7268
73- si .setCorrelationIDs (message , false , si .startedBy )
69+ si .setCorrelationIDs (message , si .startedBy , gbus . REPLY )
7470
7571 //override the correlation ids to those of the message creating the saga
7672 message .SagaCorrelationID = si .startedBySaga
@@ -93,13 +89,13 @@ func (si *sagaInvocation) Ctx() context.Context {
9389
9490func (si * sagaInvocation ) Send (ctx context.Context , toService string ,
9591 command * gbus.BusMessage , policies ... gbus.MessagePolicy ) error {
96- si .setCorrelationIDs (command , false , toService )
92+ si .setCorrelationIDs (command , toService , gbus . CMD )
9793 return si .decoratedBus .Send (ctx , toService , command , policies ... )
9894}
9995
10096func (si * sagaInvocation ) Publish (ctx context.Context , exchange , topic string ,
10197 event * gbus.BusMessage , policies ... gbus.MessagePolicy ) error {
102- si .setCorrelationIDs (event , true , "" )
98+ si .setCorrelationIDs (event , "" , gbus . EVT )
10399 return si .decoratedBus .Publish (ctx , exchange , topic , event , policies ... )
104100}
105101
0 commit comments