forked from dtm-labs/client-csharp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMsg.cs
More file actions
173 lines (148 loc) · 6.1 KB
/
Msg.cs
File metadata and controls
173 lines (148 loc) · 6.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
using DtmCommon;
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Net.Http;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace Dtmcli
{
public class Msg
{
private long _delay = 0;
private readonly TransBase _transBase;
private readonly IDtmClient _dtmClient;
private readonly IBranchBarrierFactory _branchBarrierFactory;
public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid):
this(dtmHttpClient, branchBarrierFactory, gid, default)
{
}
public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid, DateTime nextCronTime)
{
this._dtmClient = dtmHttpClient;
this._branchBarrierFactory = branchBarrierFactory;
this._transBase = TransBase.NewTransBase(gid, DtmCommon.Constant.TYPE_MSG, string.Empty, string.Empty);
if (nextCronTime != default(DateTime))
{
this._transBase.NextCronTime = nextCronTime;
}
}
public Msg Add(string action, object postData)
{
if (this._transBase.Steps == null) this._transBase.Steps = new List<Dictionary<string, string>>();
if (this._transBase.Payloads == null) this._transBase.Payloads = new List<string>();
this._transBase.Steps.Add(new Dictionary<string, string> { { Constant.Request.BRANCH_ACTION, action } });
this._transBase.Payloads.Add(JsonSerializer.Serialize(postData));
return this;
}
public Msg AddTopic(string topic, object postData)
{
return this.Add($"{DtmCommon.Constant.MsgTopicPrefix}{topic}", postData);
}
public async Task Prepare(string queryPrepared, CancellationToken cancellationToken = default)
{
this._transBase.QueryPrepared = !string.IsNullOrWhiteSpace(queryPrepared)? queryPrepared : this._transBase.QueryPrepared;
await this._dtmClient.TransCallDtm(this._transBase, this._transBase, Constant.Request.OPERATION_PREPARE, cancellationToken);
}
public async Task Submit(CancellationToken cancellationToken = default)
{
this.BuildCustomOptions();
await this._dtmClient.TransCallDtm(this._transBase, this._transBase, Constant.Request.OPERATION_SUBMIT, cancellationToken);
}
public async Task DoAndSubmitDB(string queryPrepared, DbConnection db, Func<DbTransaction, Task> busiCall, CancellationToken cancellationToken = default)
{
await this.DoAndSubmit(queryPrepared, async bb =>
{
await bb.Call(db, busiCall);
}, cancellationToken);
}
public async Task DoAndSubmit(string queryPrepared, Func<BranchBarrier, Task> busiCall, CancellationToken cancellationToken = default)
{
var bb = _branchBarrierFactory.CreateBranchBarrier(this._transBase.TransType, this._transBase.Gid, DtmCommon.Constant.Barrier.MSG_BRANCHID, DtmCommon.Constant.TYPE_MSG);
if (bb.IsInValid()) throw new DtmException($"invalid trans info: {bb.ToString()}");
await this.Prepare(queryPrepared, cancellationToken);
Exception errb = null;
try
{
await busiCall.Invoke(bb);
}
catch (Exception ex)
{
errb = ex;
}
Exception err = null;
if (errb != null && !(errb is DtmFailureException))
{
// if busicall return an error other than failure, we will query the result
var resp = await _dtmClient.TransRequestBranch(this._transBase, HttpMethod.Get, null, bb.BranchID, bb.Op, queryPrepared, cancellationToken);
err = await DtmImp.Utils.RespAsErrorCompatible(resp);
}
if ((errb != null && errb is DtmFailureException) || (err != null && err is DtmFailureException))
{
await _dtmClient.TransCallDtm(_transBase, _transBase, Constant.Request.OPERATION_ABORT, default);
}
else if (err == null)
{
await this.Submit(cancellationToken);
}
if (errb != null) throw errb;
}
/// <summary>
/// Enable wait result for trans
/// </summary>
/// <returns></returns>
public Msg EnableWaitResult()
{
this._transBase.WaitResult = true;
return this;
}
/// <summary>
/// Set timeout to fail for trans, unit is second
/// </summary>
/// <param name="timeoutToFail">timeout to fail</param>
/// <returns></returns>
public Msg SetTimeoutToFail(long timeoutToFail)
{
this._transBase.TimeoutToFail = timeoutToFail;
return this;
}
/// <summary>
/// Set retry interval for trans, unit is second
/// </summary>
/// <param name="retryInterval"></param>
/// <returns></returns>
public Msg SetRetryInterval(long retryInterval)
{
this._transBase.RetryInterval = retryInterval;
return this;
}
/// <summary>
/// Set branch headers for trans
/// </summary>
/// <param name="headers"></param>
/// <returns></returns>
public Msg SetBranchHeaders(Dictionary<string, string> headers)
{
this._transBase.BranchHeaders = headers;
return this;
}
/// <summary>
/// Set delay to call branch, unit second
/// </summary>
/// <param name="delay">delay second</param>
/// <returns></returns>
public Msg SetDelay(long delay)
{
this._delay = delay;
return this;
}
private void BuildCustomOptions()
{
if (this._delay > 0)
{
_transBase.CustomData = JsonSerializer.Serialize(new { delay = this._delay });
}
}
}
}