diff --git a/src/DtmCommon/Imp/TransBase.cs b/src/DtmCommon/Imp/TransBase.cs index 1b81d67..6df7a77 100644 --- a/src/DtmCommon/Imp/TransBase.cs +++ b/src/DtmCommon/Imp/TransBase.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Text.Json.Serialization; namespace DtmCommon @@ -76,6 +77,8 @@ public class TransBase [JsonIgnore] public string Dtm { get; set; } + public DateTime NextCronTime { get; set; } + public static TransBase NewTransBase(string gid, string transType, string dtm, string branchID) { return new TransBase diff --git a/src/Dtmcli/DtmTransFactory.cs b/src/Dtmcli/DtmTransFactory.cs index 2449773..06945e3 100644 --- a/src/Dtmcli/DtmTransFactory.cs +++ b/src/Dtmcli/DtmTransFactory.cs @@ -1,4 +1,6 @@ -namespace Dtmcli +using System; + +namespace Dtmcli { public class DtmTransFactory : IDtmTransFactory { @@ -16,7 +18,13 @@ public Msg NewMsg(string gid) var msg = new Msg(_cient, _branchBarrierFactory, gid); return msg; } - + + public Msg NewMsg(string gid, DateTime nextCronTime) + { + var msg = new Msg(_cient, _branchBarrierFactory, gid, nextCronTime); + return msg; + } + public Saga NewSaga(string gid) { var saga = new Saga(_cient, gid); diff --git a/src/Dtmcli/IDtmTransFactory.cs b/src/Dtmcli/IDtmTransFactory.cs index dc44883..f6b25c4 100644 --- a/src/Dtmcli/IDtmTransFactory.cs +++ b/src/Dtmcli/IDtmTransFactory.cs @@ -1,9 +1,13 @@ -namespace Dtmcli +using System; + +namespace Dtmcli { public interface IDtmTransFactory { Saga NewSaga(string gid); Msg NewMsg(string gid); + + Msg NewMsg(string gid, DateTime nextCronTime); } } diff --git a/src/Dtmcli/Msg/Msg.cs b/src/Dtmcli/Msg/Msg.cs index 520127a..3b80510 100644 --- a/src/Dtmcli/Msg/Msg.cs +++ b/src/Dtmcli/Msg/Msg.cs @@ -17,13 +17,22 @@ public class Msg private readonly IDtmClient _dtmClient; private readonly IBranchBarrierFactory _branchBarrierFactory; - public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid) + public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid): + this(dtmHttpClient, branchBarrierFactory, gid, default) + { + } + + public Msg(IDtmClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string gid, DateTime nextCronTime) { this._dtmClient = dtmHttpClient; this._branchBarrierFactory = branchBarrierFactory; this._transBase = TransBase.NewTransBase(gid, DtmCommon.Constant.TYPE_MSG, string.Empty, string.Empty); + if (nextCronTime != default(DateTime)) + { + this._transBase.NextCronTime = nextCronTime; + } } - + public Msg Add(string action, object postData) { if (this._transBase.Steps == null) this._transBase.Steps = new List>(); diff --git a/src/Dtmgrpc/DtmGImp/Utils.cs b/src/Dtmgrpc/DtmGImp/Utils.cs index d79a26c..6f05372 100644 --- a/src/Dtmgrpc/DtmGImp/Utils.cs +++ b/src/Dtmgrpc/DtmGImp/Utils.cs @@ -167,6 +167,8 @@ public static dtmgpb.DtmRequest BuildDtmRequest(TransBase transBase) Steps = transBase.Steps == null ? string.Empty : Utils.ToJsonString(transBase.Steps), RollbackReason = transBase.RollbackReason ?? string.Empty, }; + if (transBase.NextCronTime != default) + dtmRequest.NextCronTime = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(transBase.NextCronTime.ToUniversalTime()); foreach (var item in transBase.BinPayloads ?? new List()) { diff --git a/src/Dtmgrpc/DtmTransFactory.cs b/src/Dtmgrpc/DtmTransFactory.cs index 733f672..62339f1 100644 --- a/src/Dtmgrpc/DtmTransFactory.cs +++ b/src/Dtmgrpc/DtmTransFactory.cs @@ -1,4 +1,5 @@ -using DtmCommon; +using System; +using DtmCommon; using Dtmgrpc.DtmGImp; using Microsoft.Extensions.Options; @@ -19,10 +20,21 @@ public DtmTransFactory(IOptions optionsAccs, IDtmgRPCClient rpcClien public MsgGrpc NewMsgGrpc(string gid) { - var msg = new MsgGrpc(_rpcClient, _branchBarrierFactory, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid); + return this.NewMsgGrpc(gid, default); + } + + /// + /// + /// + /// + /// The desired execution time, which can be used to delay downstream consumption + /// + public MsgGrpc NewMsgGrpc(string gid, DateTime nextCronTime) + { + var msg = new MsgGrpc(_rpcClient, _branchBarrierFactory, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid, nextCronTime); return msg; } - + public SagaGrpc NewSagaGrpc(string gid) { var saga = new SagaGrpc(_rpcClient, _options.DtmGrpcUrl.GetWithoutPrefixgRPCUrl(), gid); diff --git a/src/Dtmgrpc/IDtmTransFactory.cs b/src/Dtmgrpc/IDtmTransFactory.cs index 90f4670..2e1ac19 100644 --- a/src/Dtmgrpc/IDtmTransFactory.cs +++ b/src/Dtmgrpc/IDtmTransFactory.cs @@ -1,10 +1,20 @@ -namespace Dtmgrpc +using System; + +namespace Dtmgrpc { public interface IDtmTransFactory { SagaGrpc NewSagaGrpc(string gid); MsgGrpc NewMsgGrpc(string gid); + + /// + /// + /// + /// + /// The desired execution time, which can be used to delay downstream consumption + /// + MsgGrpc NewMsgGrpc(string gid, DateTime nextCronTime); TccGrpc NewTccGrpc(string gid); } diff --git a/src/Dtmgrpc/Msg/MsgGrpc.cs b/src/Dtmgrpc/Msg/MsgGrpc.cs index 7bfefa2..4d1dc64 100644 --- a/src/Dtmgrpc/Msg/MsgGrpc.cs +++ b/src/Dtmgrpc/Msg/MsgGrpc.cs @@ -22,10 +22,19 @@ public class MsgGrpc private readonly IBranchBarrierFactory _branchBarrierFactory; public MsgGrpc(IDtmgRPCClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string server, string gid) + : this(dtmHttpClient, branchBarrierFactory, server, gid, default) + { + } + + public MsgGrpc(IDtmgRPCClient dtmHttpClient, IBranchBarrierFactory branchBarrierFactory, string server, string gid, DateTime nextCronTime) { this._dtmClient = dtmHttpClient; this._branchBarrierFactory = branchBarrierFactory; this._transBase = TransBase.NewTransBase(gid, Constant.TYPE_MSG, server, string.Empty); + if (nextCronTime != default(DateTime)) + { + this._transBase.NextCronTime = nextCronTime; + } } public MsgGrpc Add(string action, IMessage payload) diff --git a/src/Dtmgrpc/dtmgpb/dtmgimp.proto b/src/Dtmgrpc/dtmgpb/dtmgimp.proto index f2a6385..221f393 100644 --- a/src/Dtmgrpc/dtmgpb/dtmgimp.proto +++ b/src/Dtmgrpc/dtmgpb/dtmgimp.proto @@ -3,6 +3,7 @@ option csharp_namespace = "dtmgpb"; option go_package = "./dtmgpb"; import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; package dtmgimp; @@ -40,6 +41,7 @@ message DtmRequest { string Steps = 7; map ReqExtra = 8; string RollbackReason = 9; + google.protobuf.Timestamp NextCronTime = 10; } message DtmGidReply { diff --git a/tests/BusiGrpcService/BusiGrpcService.csproj b/tests/BusiGrpcService/BusiGrpcService.csproj index b896af4..2935141 100644 --- a/tests/BusiGrpcService/BusiGrpcService.csproj +++ b/tests/BusiGrpcService/BusiGrpcService.csproj @@ -12,6 +12,7 @@ + diff --git a/tests/BusiGrpcService/Controllers/BusiApiController.cs b/tests/BusiGrpcService/Controllers/BusiApiController.cs new file mode 100644 index 0000000..238d871 --- /dev/null +++ b/tests/BusiGrpcService/Controllers/BusiApiController.cs @@ -0,0 +1,72 @@ +using System.Text.Json; +using BusiGrpcService.Dtos; +using Microsoft.AspNetCore.Mvc; + +namespace BusiGrpcService.Controllers +{ + [ApiController] + [Route("http/busi.Busi")] + public class BusiApiController : ControllerBase + { + private readonly ILogger _logger; + private readonly Dtmcli.IBranchBarrierFactory _barrierFactory; + private readonly Dtmgrpc.IBranchBarrierFactory _grpcBarrierFactory; + + public BusiApiController(ILogger logger, Dtmcli.IBranchBarrierFactory barrierFactory, Dtmgrpc.IBranchBarrierFactory grpcBarrierFactory) + { + _logger = logger; + _barrierFactory = barrierFactory; + _grpcBarrierFactory = grpcBarrierFactory; + } + + [HttpGet("Test")] + public async Task Test() + { + return this.Ok(nameof(this.Test)); + } + + [HttpPost("TransIn")] + public async Task TransIn([FromBody] BusiRequest request) + { + _logger.LogInformation("TransIn req={req}", JsonSerializer.Serialize(request)); + + if (string.IsNullOrWhiteSpace(request.TransInResult) || request.TransInResult.Equals("SUCCESS")) + { + await Task.CompletedTask; + return Ok(); + } + else if (request.TransInResult.Equals("FAILURE")) + { + return StatusCode(422, new { error = "FAILURE" }); // 422 Unprocessable Entity for business failure + } + else if (request.TransInResult.Equals("ONGOING")) + { + return StatusCode(425, new { error = "ONGOING" }); // 425 Too Early for ongoing state + } + + return StatusCode(500, new { error = $"unknown result {request.TransInResult}" }); + } + + [HttpPost("TransOut")] + public async Task TransOut([FromBody] BusiRequest request) + { + _logger.LogInformation("TransOut req={req}", JsonSerializer.Serialize(request)); + + if (string.IsNullOrWhiteSpace(request.TransOutResult) || request.TransOutResult.Equals("SUCCESS")) + { + await Task.CompletedTask; + return Ok(); + } + else if (request.TransOutResult.Equals("FAILURE")) + { + return StatusCode(422, new { error = "FAILURE" }); // 422 Unprocessable Entity for business failure + } + else if (request.TransOutResult.Equals("ONGOING")) + { + return StatusCode(425, new { error = "ONGOING" }); // 425 Too Early for ongoing state + } + + return StatusCode(500, new { error = $"unknown result {request.TransOutResult}" }); + } + } +} \ No newline at end of file diff --git a/tests/BusiGrpcService/Dtos/BusiRequest.cs b/tests/BusiGrpcService/Dtos/BusiRequest.cs new file mode 100644 index 0000000..9bb889d --- /dev/null +++ b/tests/BusiGrpcService/Dtos/BusiRequest.cs @@ -0,0 +1,22 @@ +using System.Text.Json.Serialization; + +namespace BusiGrpcService.Dtos +{ + public class BusiRequest + { + [JsonPropertyName("amount")] + public long Amount { get; set; } + + [JsonPropertyName("transOutResult")] + public string TransOutResult { get; set; } = string.Empty; + + [JsonPropertyName("transInResult")] + public string TransInResult { get; set; } = string.Empty; + } + + public class BusiReply + { + [JsonPropertyName("message")] + public string Message { get; set; } = string.Empty; + } +} \ No newline at end of file diff --git a/tests/BusiGrpcService/Program.cs b/tests/BusiGrpcService/Program.cs index be2cc0d..c7eeea2 100644 --- a/tests/BusiGrpcService/Program.cs +++ b/tests/BusiGrpcService/Program.cs @@ -1,25 +1,36 @@ +using BusiGrpcService; using BusiGrpcService.Services; +using Dtmcli; using Dtmgrpc; using Microsoft.AspNetCore.Server.Kestrel.Core; +// Enable HTTP/2 support for unencrypted HTTP connections (required for gRPC over HTTP) +AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); + var builder = WebApplication.CreateBuilder(args); -builder.WebHost.ConfigureKestrel(options => +builder.Services.AddGrpc(options => { - // Setup a HTTP/2 endpoint without TLS. - options.ListenLocalhost(5005, o => o.Protocols = HttpProtocols.Http2); + // Configure gRPC to allow unencrypted HTTP/2 connections (for local development) + options.EnableDetailedErrors = true; }); - -builder.Services.AddGrpc(); builder.Services.AddDtmGrpc(x => { x.DtmGrpcUrl = "http://localhost:36790"; }); +builder.Services.AddDtmcli(option => +{ + option.DtmUrl = "http://localhost:36789"; +}); + +// Add controllers for HTTP API +builder.Services.AddControllers(); var app = builder.Build(); // Configure the HTTP request pipeline. app.MapGrpcService(); +app.MapControllers(); // Map the HTTP API controllers app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909"); -app.Run(); +app.Run(); \ No newline at end of file diff --git a/tests/BusiGrpcService/Properties/launchSettings.json b/tests/BusiGrpcService/Properties/launchSettings.json index 5dd0f5f..be4c40f 100644 --- a/tests/BusiGrpcService/Properties/launchSettings.json +++ b/tests/BusiGrpcService/Properties/launchSettings.json @@ -1,13 +1,12 @@ -{ +{ "profiles": { "BusiGrpcService": { "commandName": "Project", "dotnetRunMessages": true, "launchBrowser": false, - "applicationUrl": "http://localhost:5251;https://localhost:7251", "environmentVariables": { "ASPNETCORE_ENVIRONMENT": "Development" } } } -} +} \ No newline at end of file diff --git a/tests/BusiGrpcService/appsettings.json b/tests/BusiGrpcService/appsettings.json index 95303fd..531dd56 100644 --- a/tests/BusiGrpcService/appsettings.json +++ b/tests/BusiGrpcService/appsettings.json @@ -7,9 +7,16 @@ } }, "AllowedHosts": "*", - "Kestrel": { - "EndpointDefaults": { - "Protocols": "Http2" + "Kestrel": { + "Endpoints": { + "myHttp": { + "Url": "http://localhost:5005", + "Protocols": "Http2" + }, + "myGRPC": { + "Url": "http://localhost:5006", + "Protocols": "Http1" + } } } -} +} \ No newline at end of file diff --git a/tests/Dtmgrpc.IntegrationTests/ITTestHelper.cs b/tests/Dtmgrpc.IntegrationTests/ITTestHelper.cs index 48a1b8f..d9a4abf 100644 --- a/tests/Dtmgrpc.IntegrationTests/ITTestHelper.cs +++ b/tests/Dtmgrpc.IntegrationTests/ITTestHelper.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Dtmcli; using Dtmworkflow; using Xunit.Abstractions; using Xunit.Sdk; @@ -14,6 +15,7 @@ public class ITTestHelper public static string DTMHttpUrl = "http://localhost:36789"; public static string DTMgRPCUrl = "http://localhost:36790"; public static string BuisgRPCUrl = "localhost:5005"; + public static string BuisHttpUrl = "http://localhost:5006/http"; public static string BuisgRPCUrlWithProtocol = "http://localhost:5005"; private static System.Net.Http.HttpClient _client = new System.Net.Http.HttpClient(); @@ -83,6 +85,19 @@ public static ServiceProvider AddDtmGrpc(int dtmTimout = 10000) var provider = services.BuildServiceProvider(); return provider; } + + public static ServiceProvider AddDtmHttp(int dtmTimout = 10000) + { + var services = new ServiceCollection(); + services.AddLogging(); + services.AddHttpClient(); + services.AddDtmcli(option => + { + option.DtmUrl = DTMHttpUrl; + }); + var provider = services.BuildServiceProvider(); + return provider; + } public static string GetRedisAccountKey(int uid) => $"dtm:busi:redis-account-key-{uid}"; diff --git a/tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs b/tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs index fed4cdb..9128e4b 100644 --- a/tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs +++ b/tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs @@ -20,6 +20,7 @@ public async Task Submit_Should_Succeed() var gid = "msgTestGid" + Guid.NewGuid().ToString(); var msg = transFactory.NewMsgGrpc(gid); + msg.EnableWaitResult(); var req = ITTestHelper.GenBusiReq(false, false); var busiGrpc = ITTestHelper.BuisgRPCUrl; msg.Add(busiGrpc + "/busi.Busi/TransOut", req) @@ -27,8 +28,7 @@ public async Task Submit_Should_Succeed() await msg.Prepare(busiGrpc + "/busi.Busi/QueryPrepared"); await msg.Submit(); - - await Task.Delay(2000); + var status = await ITTestHelper.GetTranStatus(gid); Assert.Equal("succeed", status); } @@ -63,6 +63,38 @@ await branchBarrier.Call(conn, () => var status = await ITTestHelper.GetTranStatus(gid); Assert.Equal("succeed", status); } + + [Fact] + public async Task Submit_With_NextCronTime_Should_Succeed_Later() + { + var provider = ITTestHelper.AddDtmGrpc(); + var transFactory = provider.GetRequiredService(); + + var gid = "msgTestGid" + Guid.NewGuid().ToString(); + var msg = transFactory.NewMsgGrpc(gid, DateTime.Now.AddSeconds(10)); + var req = ITTestHelper.GenBusiReq(false, false); + var busiGrpc = ITTestHelper.BuisgRPCUrl; + msg.Add(busiGrpc + "/busi.Busi/TransOut", req) + .Add(busiGrpc + "/busi.Busi/TransIn", req); + + await msg.Prepare(busiGrpc + "/busi.Busi/QueryPrepared"); + await msg.Submit(); + + + // Since the downstream execution is delayed by 10 seconds, it will be 'submitted' after 2 seconds and 'succeed' after 15 seconds + await Task.Delay(TimeSpan.FromSeconds(0)); + var status = await ITTestHelper.GetTranStatus(gid); + Assert.Equal("submitted", status); + + await Task.Delay(TimeSpan.FromSeconds(2)); + status = await ITTestHelper.GetTranStatus(gid); + Assert.Equal("submitted", status); + + await Task.Delay(TimeSpan.FromSeconds(13)); + status = await ITTestHelper.GetTranStatus(gid); + Assert.Equal("succeed", status); + } + private static readonly int TransOutUID = 1; diff --git a/tests/Dtmgrpc.IntegrationTests/MsgHttpTest.cs b/tests/Dtmgrpc.IntegrationTests/MsgHttpTest.cs new file mode 100644 index 0000000..15eefcd --- /dev/null +++ b/tests/Dtmgrpc.IntegrationTests/MsgHttpTest.cs @@ -0,0 +1,66 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Data.Common; +using System.Threading.Tasks; +using System.Transactions; +using Dapper; +using Grpc.Core; +using MySqlConnector; +using Xunit; + +namespace Dtmgrpc.IntegrationTests +{ + public class MsgHttpTest + { + [Fact] + public async Task Submit_Should_Succeed() + { + var provider = ITTestHelper.AddDtmHttp(); + var transFactory = provider.GetRequiredService(); + + var gid = "msgTestGid" + Guid.NewGuid().ToString(); + var msg = transFactory.NewMsg(gid); + msg.EnableWaitResult(); + var req = ITTestHelper.GenBusiReq(false, false); + var busiGrpc = ITTestHelper.BuisHttpUrl; + msg.Add(busiGrpc + "/busi.Busi/TransOut", req) + .Add(busiGrpc + "/busi.Busi/TransIn", req); + + await msg.Prepare(busiGrpc + "/busi.Busi/QueryPrepared_404"); + await msg.Submit(); + + var status = await ITTestHelper.GetTranStatus(gid); + Assert.Equal("succeed", status); + } + + [Fact] + public async Task Submit_With_NextCronTime_Should_Succeed_Later() + { + var provider = ITTestHelper.AddDtmHttp(); + var transFactory = provider.GetRequiredService(); + + var gid = "msgTestGid" + Guid.NewGuid().ToString(); + var msg = transFactory.NewMsg(gid, DateTime.Now.AddSeconds(10)); + var req = ITTestHelper.GenBusiReq(false, false); + var busiGrpc = ITTestHelper.BuisHttpUrl; + msg.Add(busiGrpc + "/busi.Busi/TransOut", req) + .Add(busiGrpc + "/busi.Busi/TransIn", req); + + await msg.Prepare(busiGrpc + "/busi.Busi/QueryPrepared_404"); + await msg.Submit(); + + // Since the downstream execution is delayed by 10 seconds, it will be 'submitted' after 2 seconds and 'succeed' after 15 seconds + await Task.Delay(TimeSpan.FromSeconds(0)); + var status = await ITTestHelper.GetTranStatus(gid); + Assert.Equal("submitted", status); + + await Task.Delay(TimeSpan.FromSeconds(2)); + status = await ITTestHelper.GetTranStatus(gid); + Assert.Equal("submitted", status); + + await Task.Delay(TimeSpan.FromSeconds(13)); + status = await ITTestHelper.GetTranStatus(gid); + Assert.Equal("succeed", status); + } + } +} \ No newline at end of file