Skip to content

Commit 1fbb384

Browse files
authored
Merge pull request #9 from catcherwong/barrier
Support Barrier
2 parents 94ac9a4 + 1a81196 commit 1fbb384

12 files changed

Lines changed: 473 additions & 0 deletions
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
using Apps72.Dev.Data.DbMocker;
2+
using Apps72.Dev.Data.DbMocker.Data;
3+
using Xunit;
4+
using Dapper;
5+
using System;
6+
using System.Threading.Tasks;
7+
using Moq;
8+
using System.Linq;
9+
10+
namespace Dtmcli.Tests
11+
{
12+
public class BranchBarrierTests
13+
{
14+
[Theory]
15+
[InlineData("cancel", "try")]
16+
[InlineData("compensate", "action")]
17+
public async void Call_Should_Not_Trigger_When_IsNullCompensation(string op, string origin)
18+
{
19+
var branchBarrier = new BranchBarrier("tcc", "gid", "bid", op);
20+
21+
var conn = GetDbConnection();
22+
23+
// mock originAffected = 1
24+
conn.Mocks.When(cmd => cmd.Parameters.AsList().Select(x => x.Value).Contains(origin)).ReturnsScalar(cmd => 1);
25+
26+
// mock currentAffected != 0
27+
conn.Mocks.When(cmd => cmd.Parameters.AsList().Select(x => x.Value).Contains(op)).ReturnsScalar(cmd => 1);
28+
29+
var mockBusiCall = new Mock<Func<System.Data.Common.DbTransaction, Task>>();
30+
31+
await branchBarrier.Call(conn, mockBusiCall.Object);
32+
33+
mockBusiCall.Verify(x => x.Invoke(It.IsAny<System.Data.Common.DbTransaction>()), Times.Never);
34+
}
35+
36+
[Theory]
37+
[InlineData("other1", "other2")]
38+
public async void Call_Should_Not_Trigger_When_IsDuplicateOrPend(string op, string origin)
39+
{
40+
var branchBarrier = new BranchBarrier("tcc", "gid", "bid", op);
41+
42+
var conn = GetDbConnection();
43+
44+
// mock originAffected = 0
45+
conn.Mocks.When(cmd => cmd.Parameters.AsList().Select(x => x.Value).Contains(origin)).ReturnsScalar(cmd => 0);
46+
47+
// mock currentAffected = 0
48+
conn.Mocks.When(cmd => cmd.Parameters.AsList().Select(x => x.Value).Contains(op)).ReturnsScalar(cmd => 0);
49+
50+
var mockBusiCall = new Mock<Func<System.Data.Common.DbTransaction, Task>>();
51+
52+
await branchBarrier.Call(conn, mockBusiCall.Object);
53+
54+
mockBusiCall.Verify(x => x.Invoke(It.IsAny<System.Data.Common.DbTransaction>()), Times.Never);
55+
}
56+
57+
[Theory]
58+
[InlineData("cancel", "try")]
59+
[InlineData("compensate", "action")]
60+
public async void Call_Should_Trigger_When_IsNotNullCompensation_And_DuplicateOrPend(string op, string origin)
61+
{
62+
var branchBarrier = new BranchBarrier("tcc", "gid", "bid", op);
63+
64+
var conn = GetDbConnection();
65+
66+
// mock originAffected = 0
67+
conn.Mocks.When(cmd => cmd.Parameters.AsList().Select(x => x.Value).Contains(origin)).ReturnsScalar(cmd => 0);
68+
69+
// mock currentAffected > 0
70+
conn.Mocks.When(cmd => cmd.Parameters.AsList().Select(x => x.Value).Contains(op)).ReturnsScalar(cmd => 1);
71+
72+
var mockBusiCall = new Mock<Func<System.Data.Common.DbTransaction, Task>>();
73+
74+
await branchBarrier.Call(conn, mockBusiCall.Object);
75+
76+
mockBusiCall.Verify(x => x.Invoke(It.IsAny<System.Data.Common.DbTransaction>()), Times.Once);
77+
}
78+
79+
private MockDbConnection GetDbConnection() => new MockDbConnection();
80+
81+
82+
}
83+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
using Dtmcli.DtmImp;
2+
using System;
3+
using Xunit;
4+
5+
namespace Dtmcli.Tests
6+
{
7+
public class BranchIDGenTests
8+
{
9+
[Fact]
10+
public void TestNewSubBranchID()
11+
{
12+
var b = new BranchIDGen("");
13+
14+
// 01,...,09
15+
for (int i = 0; i < 9; i++)
16+
{
17+
var n = b.NewSubBranchID();
18+
Assert.Equal($"0{i+1}", n);
19+
}
20+
21+
// 10~98
22+
for (int i = 9; i < 99; i++)
23+
{
24+
var n = b.NewSubBranchID();
25+
Assert.Equal($"{i + 1}", n);
26+
}
27+
28+
// 99~
29+
Assert.Throws<ArgumentException>(() => b.NewSubBranchID());
30+
}
31+
32+
[Fact]
33+
public void NewSubBranchID_With_BranchId_Should_Succeed()
34+
{
35+
var b = new BranchIDGen("ss");
36+
var n = b.NewSubBranchID();
37+
Assert.Equal($"ss01", n);
38+
}
39+
40+
[Fact]
41+
public void NewSubBranchID_With_BranchId_Should_Throw_Exception()
42+
{
43+
var b = new BranchIDGen("sssssssssssssssssssss");
44+
Assert.Throws<ArgumentException>(() => b.NewSubBranchID());
45+
}
46+
47+
}
48+
}

src/Dtmcli.Tests/DbSpecialTests.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using Xunit;
2+
3+
namespace Dtmcli.Tests
4+
{
5+
public class DbSpecialTests
6+
{
7+
[Fact]
8+
public void TestDbSpecial()
9+
{
10+
DtmImp.DbSpecialDelegate.Instance.SetCurrentDBType("mysql");
11+
12+
var mysql = DtmImp.DbSpecialDelegate.Instance.GetDBSpecial();
13+
Assert.Equal("xa start 'xa1'", mysql.GetXaSQL("start", "xa1"));
14+
Assert.Equal("insert ignore into a(f) values(@f)", mysql.GetInsertIgnoreTemplate("a(f) values(@f)", "c"));
15+
16+
DtmImp.DbSpecialDelegate.Instance.SetCurrentDBType("postgres");
17+
18+
var postgres = DtmImp.DbSpecialDelegate.Instance.GetDBSpecial();
19+
Assert.Equal("begin", postgres.GetXaSQL("start", "xa1"));
20+
Assert.Equal("insert into a(f) values(@f) on conflict ON CONSTRAINT c do nothing", postgres.GetInsertIgnoreTemplate("a(f) values(@f)", "c"));
21+
}
22+
}
23+
}

src/Dtmcli.Tests/Dtmcli.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11+
<PackageReference Include="DbMocker" Version="1.21.0" />
1112
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.11.0" />
1213
<PackageReference Include="Moq" Version="4.16.1" />
1314
<PackageReference Include="xunit" Version="2.4.1" />
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
using Dapper;
2+
using Microsoft.Extensions.Logging;
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Data.Common;
6+
using System.Threading.Tasks;
7+
8+
namespace Dtmcli
9+
{
10+
public class BranchBarrier
11+
{
12+
public BranchBarrier(string transType, string gid, string branchID, string op, ILogger logger = null)
13+
{
14+
this.TransType = transType;
15+
this.Gid = gid;
16+
this.BranchID = branchID;
17+
this.Op = op;
18+
this.Logger = logger;
19+
}
20+
21+
internal ILogger Logger { get; private set; }
22+
23+
public string TransType { get; set; }
24+
25+
public string Gid { get; set; }
26+
27+
public string BranchID { get; set; }
28+
29+
public string Op { get; set; }
30+
31+
public int BarrierID { get; set; }
32+
33+
public async Task Call(DbConnection db, Func<DbTransaction, Task> busiCall)
34+
{
35+
this.BarrierID = this.BarrierID + 1;
36+
var bid = this.BarrierID.ToString().PadLeft(2, '0');
37+
38+
// check the connection state
39+
if(db.State != System.Data.ConnectionState.Open) await db.OpenAsync();
40+
41+
var tx = db.BeginTransaction();
42+
43+
try
44+
{
45+
var originType = BarrierStatic.TypeDict.TryGetValue(this.Op, out var ot) ? ot : string.Empty;
46+
47+
var originAffected = await DbUtils.InsertBarrier(db, this.TransType, this.Gid, this.BranchID, originType, bid, this.Op, tx);
48+
var currentAffected = await DbUtils.InsertBarrier(db, this.TransType, this.Gid, this.BranchID, this.Op, bid, this.Op, tx);
49+
50+
Logger?.LogDebug("originAffected: {originAffected} currentAffected: {currentAffected}", originAffected, currentAffected);
51+
52+
var isNullCompensation = IsNullCompensation(this.Op, originAffected);
53+
var isDuplicateOrPend = IsDuplicateOrPend(currentAffected);
54+
55+
if (isNullCompensation || isDuplicateOrPend)
56+
{
57+
Logger?.LogInformation("Will not exec busiCall, isNullCompensation={isNullCompensation}, isDuplicateOrPend={isDuplicateOrPend}", isNullCompensation, isDuplicateOrPend);
58+
tx.Commit();
59+
return;
60+
}
61+
62+
await busiCall.Invoke(tx);
63+
tx.Commit();
64+
}
65+
catch (Exception ex)
66+
{
67+
Logger?.LogError(ex, "Call exception");
68+
tx.Rollback();
69+
}
70+
}
71+
72+
public async Task<string> QueryPrepared(DbConnection db)
73+
{
74+
bool isErr = false;
75+
try
76+
{
77+
var tmp = DbUtils.InsertBarrier(db, this.TransType, this.Gid, "00", "msg", "01", "rollback");
78+
}
79+
catch (Exception ex)
80+
{
81+
Logger?.LogInformation(ex, "QueryPrepared error");
82+
isErr = true;
83+
}
84+
85+
var reason = string.Empty;
86+
87+
if (!isErr)
88+
{
89+
var sql = string.Format("select reason from {0} where gid=@gid and branch_id=@branch_id and op=@op and barrier_id=@barrier_id", Constant.Barrier.TABLE_NAME);
90+
91+
reason = await db.QueryFirstOrDefaultAsync(
92+
sql,
93+
new { gid = this.Gid, branch_id = this.BranchID, op = this.Op, barrier_id = this.BarrierID });
94+
}
95+
96+
if (reason.Equals("rollback")) return "FAILURE";
97+
98+
return string.Empty;
99+
}
100+
101+
/// <summary>
102+
/// 空补偿
103+
/// </summary>
104+
/// <param name="op"></param>
105+
/// <param name="originAffected"></param>
106+
/// <returns></returns>
107+
private bool IsNullCompensation(string op, int originAffected)
108+
=> (op.Equals(Constant.BranchCancel) || op.Equals(Constant.Request.BRANCH_COMPENSATE)) && originAffected > 0;
109+
110+
/// <summary>
111+
/// 这个是重复请求或者悬挂
112+
/// </summary>
113+
/// <param name="currentAffected"></param>
114+
/// <returns></returns>
115+
private bool IsDuplicateOrPend(int currentAffected)
116+
=> currentAffected == 0;
117+
118+
public bool IsInValid()
119+
{
120+
return string.IsNullOrWhiteSpace(this.TransType)
121+
|| string.IsNullOrWhiteSpace(this.Gid)
122+
|| string.IsNullOrWhiteSpace(this.BranchID)
123+
|| string.IsNullOrWhiteSpace(this.Op);
124+
}
125+
126+
public override string ToString()
127+
=> $"transInfo: {TransType} {Gid} {BranchID} {Op}";
128+
}
129+
130+
internal class BarrierStatic
131+
{
132+
internal static readonly Dictionary<string, string> TypeDict = new Dictionary<string, string>()
133+
{
134+
{ Constant.BranchCancel, Constant.BranchTry },
135+
{ Constant.Request.BRANCH_COMPENSATE, Constant.Request.BRANCH_ACTION },
136+
};
137+
}
138+
}

src/Dtmcli/Barrier/DbUtils.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using Dapper;
2+
using Dtmcli.DtmImp;
3+
using System.Data.Common;
4+
using System.Threading.Tasks;
5+
6+
namespace Dtmcli
7+
{
8+
public static class DbUtils
9+
{
10+
public static async Task<int> InsertBarrier(DbConnection db, string transType, string gid, string branchID, string op, string barrierID, string reason, DbTransaction tx = null)
11+
{
12+
if (db == null) return -1;
13+
if (string.IsNullOrWhiteSpace(op)) return 0;
14+
15+
var str = string.Concat(Constant.Barrier.TABLE_NAME, "(trans_type, gid, branch_id, op, barrier_id, reason) values(@trans_type,@gid,@branch_id,@op,@barrier_id,@reason)");
16+
var sql = DbSpecialDelegate.Instance.GetDBSpecial().GetInsertIgnoreTemplate(str, Constant.Barrier.PG_CONSTRAINT);
17+
18+
sql = DbSpecialDelegate.Instance.GetDBSpecial().GetPlaceHoldSQL(sql);
19+
20+
var result = await db.ExecuteAsync(
21+
sql,
22+
new { trans_type = transType, gid = gid, branch_id = branchID, op = op, barrier_id = barrierID, reason = reason },
23+
transaction: tx);
24+
25+
return result;
26+
}
27+
}
28+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using Microsoft.Extensions.Logging;
2+
using System;
3+
4+
namespace Dtmcli
5+
{
6+
public class DefaultBranchBarrierFactory : IBranchBarrierFactory
7+
{
8+
private readonly ILogger _logger;
9+
10+
public DefaultBranchBarrierFactory(ILoggerFactory loggerFactory)
11+
{
12+
this._logger = loggerFactory.CreateLogger<DefaultBranchBarrierFactory>();
13+
}
14+
15+
public BranchBarrier CreateBranchBarrier(string transType, string gid, string branchID, string op, ILogger logger = null)
16+
{
17+
if(logger == null) logger = _logger;
18+
19+
var ti = new BranchBarrier(transType, gid, branchID, op, logger);
20+
21+
if (ti.IsInValid()) throw new Exception($"invalid trans info: {ti.ToString()}");
22+
23+
return ti;
24+
}
25+
}
26+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using Microsoft.Extensions.Logging;
2+
3+
namespace Dtmcli
4+
{
5+
public interface IBranchBarrierFactory
6+
{
7+
BranchBarrier CreateBranchBarrier(string transType, string gid, string branchID, string op, ILogger logger = null);
8+
}
9+
}

src/Dtmcli/Constant.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,5 +103,16 @@ internal class Request
103103

104104
internal static readonly string TYPE_SAGA = "saga";
105105
}
106+
107+
internal class Barrier
108+
{
109+
internal static readonly string TABLE_NAME = "dtm_barrier.barrier";
110+
111+
internal static readonly string DBTYPE_MYSQL = "mysql";
112+
113+
internal static readonly string DBTYPE_POSTGRES = "postgres";
114+
115+
internal static readonly string PG_CONSTRAINT = "uniq_barrier";
116+
}
106117
}
107118
}

0 commit comments

Comments
 (0)