Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/DtmCommon/Imp/TransBase.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Text.Json.Serialization;

namespace DtmCommon
Expand Down Expand Up @@ -76,6 +77,8 @@ public class TransBase
[JsonIgnore]
public string Dtm { get; set; }

public DateTime NextCronTime { get; set; }

public static TransBase NewTransBase(string gid, string transType, string dtm, string branchID)
{
return new TransBase
Expand Down
12 changes: 10 additions & 2 deletions src/Dtmcli/DtmTransFactory.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Dtmcli
using System;

namespace Dtmcli
{
public class DtmTransFactory : IDtmTransFactory
{
Expand All @@ -16,7 +18,13 @@ public Msg NewMsg(string gid)
var msg = new Msg(_cient, _branchBarrierFactory, gid);
return msg;
}


public Msg NewMsg(string gid, DateTime nextCronTime)
{
var msg = new Msg(_cient, _branchBarrierFactory, gid, nextCronTime);
return msg;
}

public Saga NewSaga(string gid)
{
var saga = new Saga(_cient, gid);
Expand Down
6 changes: 5 additions & 1 deletion src/Dtmcli/IDtmTransFactory.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
namespace Dtmcli
using System;

namespace Dtmcli
{
public interface IDtmTransFactory
{
Saga NewSaga(string gid);

Msg NewMsg(string gid);

Msg NewMsg(string gid, DateTime nextCronTime);
}
}
13 changes: 11 additions & 2 deletions src/Dtmcli/Msg/Msg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@ public class Msg
private readonly IDtmClient _dtmClient;
private readonly IBranchBarrierFactory _branchBarrierFactory;

public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid)
public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid):
this(dtmHttpClient, branchBarrierFactory, gid, default)
{
}

public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid, DateTime nextCronTime)
{
this._dtmClient = dtmHttpClient;
this._branchBarrierFactory = branchBarrierFactory;
this._transBase = TransBase.NewTransBase(gid, DtmCommon.Constant.TYPE_MSG, string.Empty, string.Empty);
if (nextCronTime != default(DateTime))
{
this._transBase.NextCronTime = nextCronTime;
}
}

public Msg Add(string action, object postData)
{
if (this._transBase.Steps == null) this._transBase.Steps = new List<Dictionary<string, string>>();
Expand Down
2 changes: 2 additions & 0 deletions src/Dtmgrpc/DtmGImp/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public static dtmgpb.DtmRequest BuildDtmRequest(TransBase transBase)
Steps = transBase.Steps == null ? string.Empty : Utils.ToJsonString(transBase.Steps),
RollbackReason = transBase.RollbackReason ?? string.Empty,
};
if (transBase.NextCronTime != default)
dtmRequest.NextCronTime = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(transBase.NextCronTime.ToUniversalTime());

foreach (var item in transBase.BinPayloads ?? new List<byte[]>())
{
Expand Down
18 changes: 15 additions & 3 deletions src/Dtmgrpc/DtmTransFactory.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using DtmCommon;
using System;
using DtmCommon;
using Dtmgrpc.DtmGImp;
using Microsoft.Extensions.Options;

Expand All @@ -19,10 +20,21 @@ public DtmTransFactory(IOptions<DtmOptions> optionsAccs, IDtmgRPCClient rpcClien

public MsgGrpc NewMsgGrpc(string gid)
{
var msg = new MsgGrpc(_rpcClient, _branchBarrierFactory, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid);
return this.NewMsgGrpc(gid, default);
}

/// <summary>
///
/// </summary>
/// <param name="gid"></param>
/// <param name="nextCronTime">The desired execution time, which can be used to delay downstream consumption</param>
/// <returns></returns>
public MsgGrpc NewMsgGrpc(string gid, DateTime nextCronTime)
{
var msg = new MsgGrpc(_rpcClient, _branchBarrierFactory, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid, nextCronTime);
return msg;
}

public SagaGrpc NewSagaGrpc(string gid)
{
var saga = new SagaGrpc(_rpcClient, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid);
Expand Down
12 changes: 11 additions & 1 deletion src/Dtmgrpc/IDtmTransFactory.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
namespace Dtmgrpc
using System;

namespace Dtmgrpc
{
public interface IDtmTransFactory
{
SagaGrpc NewSagaGrpc(string gid);

MsgGrpc NewMsgGrpc(string gid);

/// <summary>
///
/// </summary>
/// <param name="gid"></param>
/// <param name="nextCronTime">The desired execution time, which can be used to delay downstream consumption</param>
/// <returns></returns>
MsgGrpc NewMsgGrpc(string gid, DateTime nextCronTime);

TccGrpc NewTccGrpc(string gid);
}
Expand Down
9 changes: 9 additions & 0 deletions src/Dtmgrpc/Msg/MsgGrpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@ public class MsgGrpc
private readonly IBranchBarrierFactory _branchBarrierFactory;

public MsgGrpc(IDtmgRPCClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string server, string gid)
: this(dtmHttpClient, branchBarrierFactory, server, gid, default)
{
}

public MsgGrpc(IDtmgRPCClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string server, string gid, DateTime nextCronTime)
{
this._dtmClient = dtmHttpClient;
this._branchBarrierFactory = branchBarrierFactory;
this._transBase = TransBase.NewTransBase(gid, Constant.TYPE_MSG, server, string.Empty);
if (nextCronTime != default(DateTime))
{
this._transBase.NextCronTime = nextCronTime;
}
}

public MsgGrpc Add(string action, IMessage payload)
Expand Down
2 changes: 2 additions & 0 deletions src/Dtmgrpc/dtmgpb/dtmgimp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
option csharp_namespace = "dtmgpb";
option go_package = "./dtmgpb";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

package dtmgimp;

Expand Down Expand Up @@ -40,6 +41,7 @@ message DtmRequest {
string Steps = 7;
map<string, string> ReqExtra = 8;
string RollbackReason = 9;
google.protobuf.Timestamp NextCronTime = 10;
}

message DtmGidReply {
Expand Down
1 change: 1 addition & 0 deletions tests/BusiGrpcService/BusiGrpcService.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Dtmcli\Dtmcli.csproj" />
<ProjectReference Include="..\..\src\Dtmgrpc\Dtmgrpc.csproj" />
<ProjectReference Include="..\..\src\DtmSERedisBarrier\DtmSERedisBarrier.csproj" />
</ItemGroup>
Expand Down
72 changes: 72 additions & 0 deletions tests/BusiGrpcService/Controllers/BusiApiController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using System.Text.Json;
using BusiGrpcService.Dtos;
using Microsoft.AspNetCore.Mvc;

namespace BusiGrpcService.Controllers
{
[ApiController]
[Route("http/busi.Busi")]
public class BusiApiController : ControllerBase
{
private readonly ILogger<BusiApiController> _logger;
private readonly Dtmcli.IBranchBarrierFactory _barrierFactory;
private readonly Dtmgrpc.IBranchBarrierFactory _grpcBarrierFactory;

public BusiApiController(ILogger<BusiApiController> logger, Dtmcli.IBranchBarrierFactory barrierFactory, Dtmgrpc.IBranchBarrierFactory grpcBarrierFactory)
{
_logger = logger;
_barrierFactory = barrierFactory;
_grpcBarrierFactory = grpcBarrierFactory;
}

[HttpGet("Test")]
public async Task<IActionResult> Test()
{
return this.Ok(nameof(this.Test));
}

[HttpPost("TransIn")]
public async Task<IActionResult> TransIn([FromBody] BusiRequest request)
{
_logger.LogInformation("TransIn req={req}", JsonSerializer.Serialize(request));

if (string.IsNullOrWhiteSpace(request.TransInResult) || request.TransInResult.Equals("SUCCESS"))
{
await Task.CompletedTask;
return Ok();
}
else if (request.TransInResult.Equals("FAILURE"))
{
return StatusCode(422, new { error = "FAILURE" }); // 422 Unprocessable Entity for business failure
}
else if (request.TransInResult.Equals("ONGOING"))
{
return StatusCode(425, new { error = "ONGOING" }); // 425 Too Early for ongoing state
}

return StatusCode(500, new { error = $"unknown result {request.TransInResult}" });
}

[HttpPost("TransOut")]
public async Task<IActionResult> TransOut([FromBody] BusiRequest request)
{
_logger.LogInformation("TransOut req={req}", JsonSerializer.Serialize(request));

if (string.IsNullOrWhiteSpace(request.TransOutResult) || request.TransOutResult.Equals("SUCCESS"))
{
await Task.CompletedTask;
return Ok();
}
else if (request.TransOutResult.Equals("FAILURE"))
{
return StatusCode(422, new { error = "FAILURE" }); // 422 Unprocessable Entity for business failure
}
else if (request.TransOutResult.Equals("ONGOING"))
{
return StatusCode(425, new { error = "ONGOING" }); // 425 Too Early for ongoing state
}

return StatusCode(500, new { error = $"unknown result {request.TransOutResult}" });
}
}
}
22 changes: 22 additions & 0 deletions tests/BusiGrpcService/Dtos/BusiRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System.Text.Json.Serialization;

namespace BusiGrpcService.Dtos
{
public class BusiRequest
{
[JsonPropertyName("amount")]
public long Amount { get; set; }

[JsonPropertyName("transOutResult")]
public string TransOutResult { get; set; } = string.Empty;

[JsonPropertyName("transInResult")]
public string TransInResult { get; set; } = string.Empty;
}

public class BusiReply
{
[JsonPropertyName("message")]
public string Message { get; set; } = string.Empty;
}
}
23 changes: 17 additions & 6 deletions tests/BusiGrpcService/Program.cs
Original file line number Diff line number Diff line change
@@ -1,25 +1,36 @@
using BusiGrpcService;
using BusiGrpcService.Services;
using Dtmcli;
using Dtmgrpc;
using Microsoft.AspNetCore.Server.Kestrel.Core;

// Enable HTTP/2 support for unencrypted HTTP connections (required for gRPC over HTTP)
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);

var builder = WebApplication.CreateBuilder(args);

builder.WebHost.ConfigureKestrel(options =>
builder.Services.AddGrpc(options =>
{
// Setup a HTTP/2 endpoint without TLS.
options.ListenLocalhost(5005, o => o.Protocols = HttpProtocols.Http2);
// Configure gRPC to allow unencrypted HTTP/2 connections (for local development)
options.EnableDetailedErrors = true;
});

builder.Services.AddGrpc();
builder.Services.AddDtmGrpc(x =>
{
x.DtmGrpcUrl = "http://localhost:36790";
});
builder.Services.AddDtmcli(option =>
{
option.DtmUrl = "http://localhost:36789";
});

// Add controllers for HTTP API
builder.Services.AddControllers();

var app = builder.Build();

// Configure the HTTP request pipeline.
app.MapGrpcService<BusiApiService>();
app.MapControllers(); // Map the HTTP API controllers
app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");

app.Run();
app.Run();
5 changes: 2 additions & 3 deletions tests/BusiGrpcService/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
{
{
"profiles": {
"BusiGrpcService": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": false,
"applicationUrl": "http://localhost:5251;https://localhost:7251",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
}
15 changes: 11 additions & 4 deletions tests/BusiGrpcService/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@
}
},
"AllowedHosts": "*",
"Kestrel": {
"EndpointDefaults": {
"Protocols": "Http2"
"Kestrel": {
"Endpoints": {
"myHttp": {
"Url": "http://localhost:5005",
"Protocols": "Http2"
},
"myGRPC": {
"Url": "http://localhost:5006",
"Protocols": "Http1"
}
}
}
}
}
Loading
Loading