Skip to content

Commit eea5f0e

Browse files
committed
feat(MSG gPRC/http) Initialize NextCronTime during prepare in MSG mode
1 parent c72e128 commit eea5f0e

11 files changed

Lines changed: 126 additions & 10 deletions

File tree

src/DtmCommon/Imp/TransBase.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Generic;
23
using System.Text.Json.Serialization;
34

45
namespace DtmCommon
@@ -76,6 +77,8 @@ public class TransBase
7677
[JsonIgnore]
7778
public string Dtm { get; set; }
7879

80+
public DateTime NextCronTime { get; set; }
81+
7982
public static TransBase NewTransBase(string gid, string transType, string dtm, string branchID)
8083
{
8184
return new TransBase

src/Dtmcli/DtmTransFactory.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
namespace Dtmcli
1+
using System;
2+
3+
namespace Dtmcli
24
{
35
public class DtmTransFactory : IDtmTransFactory
46
{
@@ -16,7 +18,13 @@ public Msg NewMsg(string gid)
1618
var msg = new Msg(_cient, _branchBarrierFactory, gid);
1719
return msg;
1820
}
19-
21+
22+
public Msg NewMsg(string gid, DateTime nextCronTime)
23+
{
24+
var msg = new Msg(_cient, _branchBarrierFactory, gid, nextCronTime);
25+
return msg;
26+
}
27+
2028
public Saga NewSaga(string gid)
2129
{
2230
var saga = new Saga(_cient, gid);

src/Dtmcli/IDtmTransFactory.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
namespace Dtmcli
1+
using System;
2+
3+
namespace Dtmcli
24
{
35
public interface IDtmTransFactory
46
{
57
Saga NewSaga(string gid);
68

79
Msg NewMsg(string gid);
10+
11+
Msg NewMsg(string gid, DateTime nextCronTime);
812
}
913
}

src/Dtmcli/Msg/Msg.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,22 @@ public class Msg
1717
private readonly IDtmClient _dtmClient;
1818
private readonly IBranchBarrierFactory _branchBarrierFactory;
1919

20-
public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid)
20+
public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid):
21+
this(dtmHttpClient, branchBarrierFactory, gid, default)
22+
{
23+
}
24+
25+
public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid, DateTime nextCronTime)
2126
{
2227
this._dtmClient = dtmHttpClient;
2328
this._branchBarrierFactory = branchBarrierFactory;
2429
this._transBase = TransBase.NewTransBase(gid, DtmCommon.Constant.TYPE_MSG, string.Empty, string.Empty);
30+
if (nextCronTime != default(DateTime))
31+
{
32+
this._transBase.NextCronTime = nextCronTime;
33+
}
2534
}
26-
35+
2736
public Msg Add(string action, object postData)
2837
{
2938
if (this._transBase.Steps == null) this._transBase.Steps = new List<Dictionary<string, string>>();

src/Dtmgrpc/DtmGImp/Utils.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ public static dtmgpb.DtmRequest BuildDtmRequest(TransBase transBase)
167167
Steps = transBase.Steps == null ? string.Empty : Utils.ToJsonString(transBase.Steps),
168168
RollbackReason = transBase.RollbackReason ?? string.Empty,
169169
};
170+
if (transBase.NextCronTime != default)
171+
dtmRequest.NextCronTime = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(transBase.NextCronTime.ToUniversalTime());
170172

171173
foreach (var item in transBase.BinPayloads ?? new List<byte[]>())
172174
{

src/Dtmgrpc/DtmTransFactory.cs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using DtmCommon;
1+
using System;
2+
using DtmCommon;
23
using Dtmgrpc.DtmGImp;
34
using Microsoft.Extensions.Options;
45

@@ -19,10 +20,21 @@ public DtmTransFactory(IOptions<DtmOptions> optionsAccs, IDtmgRPCClient rpcClien
1920

2021
public MsgGrpc NewMsgGrpc(string gid)
2122
{
22-
var msg = new MsgGrpc(_rpcClient, _branchBarrierFactory, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid);
23+
return this.NewMsgGrpc(gid, default);
24+
}
25+
26+
/// <summary>
27+
///
28+
/// </summary>
29+
/// <param name="gid"></param>
30+
/// <param name="nextCronTime">The desired execution time, which can be used to delay downstream consumption</param>
31+
/// <returns></returns>
32+
public MsgGrpc NewMsgGrpc(string gid, DateTime nextCronTime)
33+
{
34+
var msg = new MsgGrpc(_rpcClient, _branchBarrierFactory, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid, nextCronTime);
2335
return msg;
2436
}
25-
37+
2638
public SagaGrpc NewSagaGrpc(string gid)
2739
{
2840
var saga = new SagaGrpc(_rpcClient, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid);

src/Dtmgrpc/IDtmTransFactory.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
1-
namespace Dtmgrpc
1+
using System;
2+
3+
namespace Dtmgrpc
24
{
35
public interface IDtmTransFactory
46
{
57
SagaGrpc NewSagaGrpc(string gid);
68

79
MsgGrpc NewMsgGrpc(string gid);
10+
11+
/// <summary>
12+
///
13+
/// </summary>
14+
/// <param name="gid"></param>
15+
/// <param name="nextCronTime">The desired execution time, which can be used to delay downstream consumption</param>
16+
/// <returns></returns>
17+
MsgGrpc NewMsgGrpc(string gid, DateTime nextCronTime);
818

919
TccGrpc NewTccGrpc(string gid);
1020
}

src/Dtmgrpc/Msg/MsgGrpc.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,19 @@ public class MsgGrpc
2222
private readonly IBranchBarrierFactory _branchBarrierFactory;
2323

2424
public MsgGrpc(IDtmgRPCClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string server, string gid)
25+
: this(dtmHttpClient, branchBarrierFactory, server, gid, default)
26+
{
27+
}
28+
29+
public MsgGrpc(IDtmgRPCClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string server, string gid, DateTime nextCronTime)
2530
{
2631
this._dtmClient = dtmHttpClient;
2732
this._branchBarrierFactory = branchBarrierFactory;
2833
this._transBase = TransBase.NewTransBase(gid, Constant.TYPE_MSG, server, string.Empty);
34+
if (nextCronTime != default(DateTime))
35+
{
36+
this._transBase.NextCronTime = nextCronTime;
37+
}
2938
}
3039

3140
public MsgGrpc Add(string action, IMessage payload)

src/Dtmgrpc/dtmgpb/dtmgimp.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
option csharp_namespace = "dtmgpb";
44
option go_package = "./dtmgpb";
55
import "google/protobuf/empty.proto";
6+
import "google/protobuf/timestamp.proto";
67

78
package dtmgimp;
89

@@ -40,6 +41,7 @@ message DtmRequest {
4041
string Steps = 7;
4142
map<string, string> ReqExtra = 8;
4243
string RollbackReason = 9;
44+
google.protobuf.Timestamp NextCronTime = 10;
4345
}
4446

4547
message DtmGidReply {

tests/Dtmcli.IntegrationTests/MsgHttpTest.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,35 @@ public async Task Submit_With_EffectTime_Should_Succeed_Later()
5959
status = await ITTestHelper.GetTranStatus(gid);
6060
Assert.Equal("succeed", status);
6161
}
62+
63+
[Fact]
64+
public async Task Submit_With_NextCronTime_Should_Succeed_Later()
65+
{
66+
var provider = ITTestHelper.AddDtmHttp();
67+
var transFactory = provider.GetRequiredService<Dtmcli.IDtmTransFactory>();
68+
69+
var gid = "msgTestGid" + Guid.NewGuid().ToString();
70+
DateTime effectTime = DateTime.Now.AddSeconds(10);
71+
var msg = transFactory.NewMsg(gid, effectTime);
72+
var req = ITTestHelper.GenBusiReq(false, false);
73+
var busiUrl = ITTestHelper.BuisHttpUrl;
74+
msg.Add(busiUrl + "/busi.Busi/TransOut", req)
75+
.Add(busiUrl + "/busi.Busi/TransIn", req);
76+
77+
await msg.Prepare(busiUrl + "/busi.Busi/QueryPrepared_404");
78+
await msg.Submit();
79+
80+
// Since the downstream execution is delayed by 10 seconds, it will be 'submitted' after 2 seconds and 'succeed' after 15 seconds
81+
await Task.Delay(TimeSpan.FromSeconds(0));
82+
var status = await ITTestHelper.GetTranStatus(gid);
83+
Assert.Equal("submitted", status);
84+
85+
await Task.Delay(TimeSpan.FromSeconds(2));
86+
status = await ITTestHelper.GetTranStatus(gid);
87+
Assert.Equal("submitted", status);
88+
89+
await Task.Delay(TimeSpan.FromSeconds(13));
90+
status = await ITTestHelper.GetTranStatus(gid);
91+
Assert.Equal("succeed", status);
92+
}
6293
}

0 commit comments

Comments
 (0)