diff --git a/src/DtmCommon/Imp/TransBase.cs b/src/DtmCommon/Imp/TransBase.cs index 1b81d67..6df7a77 100644 --- a/src/DtmCommon/Imp/TransBase.cs +++ b/src/DtmCommon/Imp/TransBase.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Text.Json.Serialization; namespace DtmCommon @@ -76,6 +77,8 @@ public class TransBase [JsonIgnore] public string Dtm { get; set; } + public DateTime NextCronTime { get; set; } + public static TransBase NewTransBase(string gid, string transType, string dtm, string branchID) { return new TransBase diff --git a/src/Dtmcli/DtmTransFactory.cs b/src/Dtmcli/DtmTransFactory.cs index 2449773..06945e3 100644 --- a/src/Dtmcli/DtmTransFactory.cs +++ b/src/Dtmcli/DtmTransFactory.cs @@ -1,4 +1,6 @@ -namespace Dtmcli +using System; + +namespace Dtmcli { public class DtmTransFactory : IDtmTransFactory { @@ -16,7 +18,13 @@ public Msg NewMsg(string gid) var msg = new Msg(_cient, _branchBarrierFactory, gid); return msg; } - + + public Msg NewMsg(string gid, DateTime nextCronTime) + { + var msg = new Msg(_cient, _branchBarrierFactory, gid, nextCronTime); + return msg; + } + public Saga NewSaga(string gid) { var saga = new Saga(_cient, gid); diff --git a/src/Dtmcli/IDtmTransFactory.cs b/src/Dtmcli/IDtmTransFactory.cs index dc44883..f6b25c4 100644 --- a/src/Dtmcli/IDtmTransFactory.cs +++ b/src/Dtmcli/IDtmTransFactory.cs @@ -1,9 +1,13 @@ -namespace Dtmcli +using System; + +namespace Dtmcli { public interface IDtmTransFactory { Saga NewSaga(string gid); Msg NewMsg(string gid); + + Msg NewMsg(string gid, DateTime nextCronTime); } } diff --git a/src/Dtmcli/Msg/Msg.cs b/src/Dtmcli/Msg/Msg.cs index 520127a..3b80510 100644 --- a/src/Dtmcli/Msg/Msg.cs +++ b/src/Dtmcli/Msg/Msg.cs @@ -17,13 +17,22 @@ public class Msg private readonly IDtmClient _dtmClient; private readonly IBranchBarrierFactory _branchBarrierFactory; - public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid) + public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid): + this(dtmHttpClient, branchBarrierFactory, gid, default) + { + } + + public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid, DateTime nextCronTime) { this._dtmClient = dtmHttpClient; this._branchBarrierFactory = branchBarrierFactory; this._transBase = TransBase.NewTransBase(gid, DtmCommon.Constant.TYPE_MSG, string.Empty, string.Empty); + if (nextCronTime != default(DateTime)) + { + this._transBase.NextCronTime = nextCronTime; + } } - + public Msg Add(string action, object postData) { if (this._transBase.Steps == null) this._transBase.Steps = new List>(); diff --git a/src/Dtmgrpc/DtmGImp/Utils.cs b/src/Dtmgrpc/DtmGImp/Utils.cs index d79a26c..6f05372 100644 --- a/src/Dtmgrpc/DtmGImp/Utils.cs +++ b/src/Dtmgrpc/DtmGImp/Utils.cs @@ -167,6 +167,8 @@ public static dtmgpb.DtmRequest BuildDtmRequest(TransBase transBase) Steps = transBase.Steps == null ? string.Empty : Utils.ToJsonString(transBase.Steps), RollbackReason = transBase.RollbackReason ?? string.Empty, }; + if (transBase.NextCronTime != default) + dtmRequest.NextCronTime = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(transBase.NextCronTime.ToUniversalTime()); foreach (var item in transBase.BinPayloads ?? new List()) { diff --git a/src/Dtmgrpc/DtmTransFactory.cs b/src/Dtmgrpc/DtmTransFactory.cs index 733f672..62339f1 100644 --- a/src/Dtmgrpc/DtmTransFactory.cs +++ b/src/Dtmgrpc/DtmTransFactory.cs @@ -1,4 +1,5 @@ -using DtmCommon; +using System; +using DtmCommon; using Dtmgrpc.DtmGImp; using Microsoft.Extensions.Options; @@ -19,10 +20,21 @@ public DtmTransFactory(IOptions optionsAccs, IDtmgRPCClient rpcClien public MsgGrpc NewMsgGrpc(string gid) { - var msg = new MsgGrpc(_rpcClient, _branchBarrierFactory, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid); + return this.NewMsgGrpc(gid, default); + } + + /// + /// + /// + /// + /// The desired execution time, which can be used to delay downstream consumption + /// + public MsgGrpc NewMsgGrpc(string gid, DateTime nextCronTime) + { + var msg = new MsgGrpc(_rpcClient, _branchBarrierFactory, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid, nextCronTime); return msg; } - + public SagaGrpc NewSagaGrpc(string gid) { var saga = new SagaGrpc(_rpcClient, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid); diff --git a/src/Dtmgrpc/IDtmTransFactory.cs b/src/Dtmgrpc/IDtmTransFactory.cs index 90f4670..2e1ac19 100644 --- a/src/Dtmgrpc/IDtmTransFactory.cs +++ b/src/Dtmgrpc/IDtmTransFactory.cs @@ -1,10 +1,20 @@ -namespace Dtmgrpc +using System; + +namespace Dtmgrpc { public interface IDtmTransFactory { SagaGrpc NewSagaGrpc(string gid); MsgGrpc NewMsgGrpc(string gid); + + /// + /// + /// + /// + /// The desired execution time, which can be used to delay downstream consumption + /// + MsgGrpc NewMsgGrpc(string gid, DateTime nextCronTime); TccGrpc NewTccGrpc(string gid); } diff --git a/src/Dtmgrpc/Msg/MsgGrpc.cs b/src/Dtmgrpc/Msg/MsgGrpc.cs index 7bfefa2..4d1dc64 100644 --- a/src/Dtmgrpc/Msg/MsgGrpc.cs +++ b/src/Dtmgrpc/Msg/MsgGrpc.cs @@ -22,10 +22,19 @@ public class MsgGrpc private readonly IBranchBarrierFactory _branchBarrierFactory; public MsgGrpc(IDtmgRPCClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string server, string gid) + : this(dtmHttpClient, branchBarrierFactory, server, gid, default) + { + } + + public MsgGrpc(IDtmgRPCClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string server, string gid, DateTime nextCronTime) { this._dtmClient = dtmHttpClient; this._branchBarrierFactory = branchBarrierFactory; this._transBase = TransBase.NewTransBase(gid, Constant.TYPE_MSG, server, string.Empty); + if (nextCronTime != default(DateTime)) + { + this._transBase.NextCronTime = nextCronTime; + } } public MsgGrpc Add(string action, IMessage payload) diff --git a/src/Dtmgrpc/dtmgpb/dtmgimp.proto b/src/Dtmgrpc/dtmgpb/dtmgimp.proto index f2a6385..221f393 100644 --- a/src/Dtmgrpc/dtmgpb/dtmgimp.proto +++ b/src/Dtmgrpc/dtmgpb/dtmgimp.proto @@ -3,6 +3,7 @@ option csharp_namespace = "dtmgpb"; option go_package = "./dtmgpb"; import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; package dtmgimp; @@ -40,6 +41,7 @@ message DtmRequest { string Steps = 7; map ReqExtra = 8; string RollbackReason = 9; + google.protobuf.Timestamp NextCronTime = 10; } message DtmGidReply { diff --git a/tests/Dtmcli.IntegrationTests/MsgHttpTest.cs b/tests/Dtmcli.IntegrationTests/MsgHttpTest.cs index fc3e1c6..e8bc43a 100644 --- a/tests/Dtmcli.IntegrationTests/MsgHttpTest.cs +++ b/tests/Dtmcli.IntegrationTests/MsgHttpTest.cs @@ -59,4 +59,35 @@ public async Task Submit_With_EffectTime_Should_Succeed_Later() status = await ITTestHelper.GetTranStatus(gid); Assert.Equal("succeed", status); } + + [Fact] + public async Task Submit_With_NextCronTime_Should_Succeed_Later() + { + var provider = ITTestHelper.AddDtmHttp(); + var transFactory = provider.GetRequiredService(); + + var gid = "msgTestGid" + Guid.NewGuid().ToString(); + DateTime effectTime = DateTime.Now.AddSeconds(10); + var msg = transFactory.NewMsg(gid, effectTime); + var req = ITTestHelper.GenBusiReq(false, false); + var busiUrl = ITTestHelper.BuisHttpUrl; + msg.Add(busiUrl + "/busi.Busi/TransOut", req) + .Add(busiUrl + "/busi.Busi/TransIn", req); + + await msg.Prepare(busiUrl + "/busi.Busi/QueryPrepared_404"); + await msg.Submit(); + + // Since the downstream execution is delayed by 10 seconds, it will be 'submitted' after 2 seconds and 'succeed' after 15 seconds + await Task.Delay(TimeSpan.FromSeconds(0)); + var status = await ITTestHelper.GetTranStatus(gid); + Assert.Equal("submitted", status); + + await Task.Delay(TimeSpan.FromSeconds(2)); + status = await ITTestHelper.GetTranStatus(gid); + Assert.Equal("submitted", status); + + await Task.Delay(TimeSpan.FromSeconds(13)); + status = await ITTestHelper.GetTranStatus(gid); + Assert.Equal("succeed", status); + } } \ No newline at end of file diff --git a/tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs b/tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs index 38d0d0c..67b6001 100644 --- a/tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs +++ b/tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs @@ -92,6 +92,32 @@ public async Task Submit_With_EffectTime_Should_Succeed_Later() Assert.Equal("succeed", status); } + [Fact] + public async Task Submit_With_NextCronTime_Should_Succeed_Later() + { + var provider = ITTestHelper.AddDtmGrpc(); + var transFactory = provider.GetRequiredService(); + + var gid = "msgTestGid" + Guid.NewGuid().ToString(); + DateTime effectTime = DateTime.Now.AddSeconds(10); + var msg = transFactory.NewMsgGrpc(gid, effectTime); + var req = ITTestHelper.GenBusiReq(false, false); + var busiGrpc = ITTestHelper.BuisgRPCUrl; + msg.Add(busiGrpc + "/busi.Busi/TransOut", req) + .Add(busiGrpc + "/busi.Busi/TransIn", req); + + await msg.Prepare(busiGrpc + "/busi.Busi/QueryPrepared"); + await msg.Submit(); + + // Since the downstream execution is delayed by 10 seconds, it will be 'submitted' after 2 seconds and 'succeed' after 15 seconds + await Task.Delay(TimeSpan.FromSeconds(2)); + var status = await ITTestHelper.GetTranStatus(gid); + Assert.Equal("submitted", status); + + await Task.Delay(TimeSpan.FromSeconds(13)); + status = await ITTestHelper.GetTranStatus(gid); + Assert.Equal("succeed", status); + } private static readonly int TransOutUID = 1;