Skip to content

Commit 07e2f04

Browse files
authored
msg support add topic (#64)
* feat(msg): add topic * add test and sample for msg topic * test: grpc msg topic
1 parent 5127441 commit 07e2f04

7 files changed

Lines changed: 70 additions & 3 deletions

File tree

samples/DtmSample/Controllers/MsgTestController.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,5 +218,47 @@ public async Task<IActionResult> MsgMongoQueryPrepared(CancellationToken cancell
218218
var res = await bb.MongoQueryPrepared(cli);
219219
return Ok(new { dtm_result = res });
220220
}
221+
222+
/// <summary>
223+
/// MSG with not exist topic will get 【topic not found】
224+
/// </summary>
225+
/// <param name="cancellationToken"></param>
226+
/// <returns></returns>
227+
[HttpGet("msg-topic-notfound")]
228+
public async Task<IActionResult> MsgWithTopicNotFound(CancellationToken cancellationToken)
229+
{
230+
var gid = await _dtmClient.GenGid(cancellationToken);
231+
var req = new TransRequest("1", -30);
232+
var msg = _transFactory.NewMsg(gid)
233+
.AddTopic("not_exist_topic", req);
234+
235+
await msg.Prepare(_settings.BusiUrl + "/msg-queryprepared", cancellationToken);
236+
await msg.Submit(cancellationToken);
237+
238+
return Ok(TransResponse.BuildSucceedResponse());
239+
}
240+
241+
/// <summary>
242+
/// MSG with exist topic
243+
/// </summary>
244+
/// <param name="cancellationToken"></param>
245+
/// <returns></returns>
246+
[HttpGet("msg-topic")]
247+
public async Task<IActionResult> MsgWithTopic(CancellationToken cancellationToken)
248+
{
249+
var gid = await _dtmClient.GenGid(cancellationToken);
250+
251+
// should subscribe at first
252+
var topic ="mytopic";
253+
254+
var req = new TransRequest("1", -30);
255+
var msg = _transFactory.NewMsg(gid)
256+
.AddTopic(topic, req);
257+
258+
await msg.Prepare(_settings.BusiUrl + "/msg-queryprepared", cancellationToken);
259+
await msg.Submit(cancellationToken);
260+
261+
return Ok(TransResponse.BuildSucceedResponse());
262+
}
221263
}
222264
}

src/DtmCommon/Constant.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ public class Constant
1818
/// </summary>
1919
public static readonly string ResultDuplicated = "DUPLICATED";
2020

21+
public static readonly string MsgTopicPrefix = "topic://";
22+
2123
internal class Op
2224
{
2325
internal static readonly string Submit = "Submit";

src/Dtmcli/Msg/Msg.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ public Msg Add(string action, object postData)
3434
return this;
3535
}
3636

37+
public Msg AddTopic(string topic, object postData)
38+
{
39+
return this.Add($"{DtmCommon.Constant.MsgTopicPrefix}{topic}", postData);
40+
}
41+
3742
public async Task Prepare(string queryPrepared, CancellationToken cancellationToken = default)
3843
{
3944
this._transBase.QueryPrepared = !string.IsNullOrWhiteSpace(queryPrepared)? queryPrepared : this._transBase.QueryPrepared;

src/Dtmgrpc/Msg/MsgGrpc.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ public MsgGrpc Add(string action, IMessage payload)
3838
return this;
3939
}
4040

41+
public MsgGrpc AddTopic(string topic, IMessage payload)
42+
{
43+
return this.Add($"{Constant.MsgTopicPrefix}{topic}", payload);
44+
}
45+
4146
public async Task Prepare(string queryPrepared, CancellationToken cancellationToken = default)
4247
{
4348
this._transBase.QueryPrepared = !string.IsNullOrWhiteSpace(queryPrepared) ? queryPrepared : this._transBase.QueryPrepared;

src/Dtmgrpc/dtmgpb/dtmgimp.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ service Dtm {
1313
rpc Prepare(DtmRequest) returns (google.protobuf.Empty) {}
1414
rpc Abort(DtmRequest) returns (google.protobuf.Empty) {}
1515
rpc RegisterBranch(DtmBranchRequest) returns (google.protobuf.Empty) {}
16+
rpc Subscribe(DtmTopicRequest) returns (google.protobuf.Empty){}
17+
rpc Unsubscribe(DtmTopicRequest) returns (google.protobuf.Empty){}
18+
rpc DeleteTopic(DtmTopicRequest) returns (google.protobuf.Empty){}
1619
}
1720

1821
message DtmTransOptions {
@@ -50,3 +53,9 @@ message DtmBranchRequest {
5053
map<string, string> Data = 5;
5154
bytes BusiPayload = 6;
5255
}
56+
57+
message DtmTopicRequest {
58+
string Topic = 1;
59+
string URL = 2;
60+
string Remark = 3;
61+
}

tests/Dtmcli.Tests/MsgTests.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System;
66
using System.Collections.Generic;
77
using System.Data.Common;
8+
using System.Linq;
89
using System.Net.Http;
910
using System.Threading;
1011
using System.Threading.Tasks;
@@ -45,6 +46,7 @@ public async void Submit_Should_Succeed()
4546

4647
msg.Add(busi + "/TransOut", req)
4748
.Add(busi + "/TransIn", req)
49+
.AddTopic("test-topic", req)
4850
.EnableWaitResult()
4951
.SetRetryInterval(10)
5052
.SetTimeoutToFail(100)
@@ -209,10 +211,11 @@ protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage
209211
Assert.Equal(100, transBase.TimeoutToFail);
210212
Assert.Contains("bh1", transBase.BranchHeaders.Keys);
211213
Assert.Contains("bh2", transBase.BranchHeaders.Keys);
212-
Assert.Equal(2, transBase.Payloads.Count);
213-
Assert.Equal(2, transBase.Steps.Count);
214+
Assert.Equal(3, transBase.Payloads.Count);
215+
Assert.Equal(3, transBase.Steps.Count);
216+
Assert.Contains("topic://test-topic", transBase.Steps.SelectMany(x => x.Values).ToList());
214217

215-
if(request.RequestUri.AbsolutePath.Contains("submit",StringComparison.OrdinalIgnoreCase))
218+
if (request.RequestUri.AbsolutePath.Contains("submit",StringComparison.OrdinalIgnoreCase))
216219
{
217220
Assert.NotEmpty(transBase.CustomData);
218221
Assert.Contains("10", transBase.CustomData);

tests/Dtmgrpc.Tests/MsgTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public async void Submit_Should_Succeed()
4848

4949
msg.Add(busi + "/TransOut", req)
5050
.Add(busi + "/TransIn", req)
51+
.AddTopic("mytopic", req)
5152
.EnableWaitResult()
5253
.SetRetryInterval(10)
5354
.SetTimeoutToFail(100)

0 commit comments

Comments
 (0)