相关文章推荐
有胆有识的泡面  ·  go ...·  1 周前    · 
玩命的小蝌蚪  ·  SQL 序号 ...·  5 月前    · 
暗恋学妹的柳树  ·  ConvertRecord - 墨天轮·  1 年前    · 
冷冷的上铺  ·  javascript - Promise ...·  1 年前    · 
踢足球的鸵鸟  ·  linear image ...·  1 年前    · 

CAP 是一个EventBus,同时也是一个在微服务或者SOA系统中解决分布式事务问题的一个框架。它有助于创建可扩展,可靠并且易于更改的微服务系统。

这个项目中使用到SqlServer(数据库方面大同小异)、MongoDb、Kafka、RabbitMq以及Consul

1:引入相关Nuget就行

DotNetCore.CAP
DotNetCore.CAP.Kafka
DotNetCore.CAP.RabbitMQ
DotNetCore.CAP.SqlServer
DotNetCore.CAP.MongoDB
DotNetCore.CAP.Dashboard

2:在program中使用(我的项目是基于net6的)

builder.Services.AddDbContext<AppDbContext>(opt =>
    opt.UseSqlServer(builder.Configuration.GetSection("CAP:SqlServer").Value);
//Options, If you are using MongoDB
builder.Services.AddSingleton<IMongoClient>(new MongoClient(builder.Configuration.GetSection("CAP:MongoDB").Value));
builder.Services.AddCap(x =>
    // If you are using EF, you need to add the configuration:
    //x.UseEntityFramework<AppDbContext>(); //Options, Notice: You don't need to config x.UseSqlServer(""") again! CAP can autodiscovery.
    // If you are using ADO.NET, choose to add configuration you needed:
    x.UseSqlServer(builder.Configuration.GetSection("CAP:SqlServer").Value);
    //x.UseMySql("Your ConnectionStrings");
    //x.UsePostgreSql("Your ConnectionStrings");
    // If you are using MongoDB, you need to add the configuration:
    //x.UseMongoDB(opt => {
    //    opt.DatabaseConnection = builder.Configuration.GetSection("CAP:MongoDB").Value;
    //});  //注意,仅支持MongoDB 4.0+集群
    // CAP support RabbitMQ,Kafka,AzureService as the MQ, choose to add configuration you needed:
    x.UseRabbitMQ(opt =>
        opt.HostName = "192.168.1.7";
        opt.Port = 5672;
        opt.UserName = "admin";
        opt.Password = "admin";
        opt.VirtualHost = "/";
    x.UseKafka(opt =>
        opt.Servers = "192.168.1.12:9092";
    //x.UseRabbitMQ(builder.Configuration.GetSection("CAP:RabbitMQ").Value);
    x.UseDashboard();
    DiscoveryOptions discoveryOptions = new DiscoveryOptions();
    discoveryOptions.CurrentNodePort = 5173;
    builder.Configuration.Bind(discoveryOptions);
    // Register to Consul
    x.UseDiscovery(d =>
        d.DiscoveryServerHostName = "localhost";
        d.DiscoveryServerPort = 8500;
        d.CurrentNodeHostName = "localhost";
        d.CurrentNodePort = 5222;
        d.NodeId = "1";
        d.NodeName = "fanlin";
        d.Scheme = "http";
        d.MatchPath = "/api/HealthCheck";
    x.FailedRetryInterval = 10;//失败重试的间隔时间
    x.FailedRetryCount = 10;//失败重试的次数
    x.FailedThresholdCallback = info =>
        Console.WriteLine("Publish Message Error::" + info.Message);
    //x.UseKafka("ConnectionString");
    //x.UseAzureServiceBus("ConnectionString");
    //x.UseAmazonSQS();

3:新建PublishController,主要用于发布,代码很简单。代码如下

/// <summary>
    /// CAP的消费是自动消费的
    /// </summary>
    [Route("api/[controller]")]
    [ApiController]
    public class PublishController : ControllerBase
        private static string _publishName = "FanlinCAPDemo.Servces.Test";
        private readonly ICapPublisher _capBus;
        private readonly IConfiguration _configuration;
        private readonly AppDbContext _appDbContext;
        public PublishController(ICapPublisher capPublisher, IConfiguration configuration, AppDbContext appDbContext)
            _capBus = capPublisher;
            _configuration = configuration;
            _appDbContext = appDbContext;
        [HttpGet]
        [Route("no/transaction")]//根目录
        public async Task<IActionResult> WithoutTransaction()
            Console.WriteLine("普通----无事务");
            var user = _appDbContext.Users.Find("1");
            await _capBus.PublishAsync(_publishName, user);//应该把数据写到publish表
            return Ok();
        [HttpGet]
        [Route("adonet/transaction")]
        public IActionResult AdonetWithTransaction()
            Console.WriteLine("普通事务---连接字符串----事务");
            var user = _appDbContext.Users.Find("1");
            using (var connection = new SqlConnection(_configuration.GetSection("CAP:SqlServer").Value))
                using (var transaction = connection.BeginTransaction(_capBus, true))
                    //your business logic code
                    _capBus.Publish(_publishName, user);
            return Ok();
        [HttpGet]
        [Route("ef/transaction")]
        public IActionResult EntityFrameworkWithTransaction()
            Console.WriteLine("上下文---事务");
            var user = _appDbContext.Users.Find("1");
            //带header
            IDictionary<string, string?> dicHeader = new Dictionary<string, string?>();
            dicHeader.Add("Husband", "Fanlin");
            dicHeader.Add("Wife", "Baoting");
            dicHeader.Add("SumAge", "34");
            using (var trans = _appDbContext.Database.BeginTransaction(_capBus, autoCommit: true))
                //your business logic code
                _capBus.Publish(_publishName, user, dicHeader);
            return Ok();

4:新建ConsumerController 用于消费,代码很简单,需要注意,订阅需要标注特性CapSubscribe,其属性name就是发布者的_publishName,也可以顶一个多个group,在rabbitmq里面会自动新建相应的队列!代码如下

[Route("api/[controller]")]
    [ApiController]
    public class ConsumerController : ControllerBase
        private readonly AppDbContext _context;
        private readonly IMongoClient _client;
        private readonly ICapPublisher _capBus;
        public ConsumerController(AppDbContext appDbContext, IMongoClient mongoClient, ICapPublisher iCapPublisher)
            _context = appDbContext;
            _capBus = iCapPublisher;
            _client = mongoClient;
            var collection = _client.GetDatabase("MyCap").GetCollection<Users>("MyCap.User");
            collection.InsertOne(new Users()
                UserID = "3",
                UserName = "范小包",
                UserPy = "fxb",
                UserPwd = "1234",
                department = "1",
                userRemark = "jkl",
                UserWb = "abc",
                headImageFile = "none"
        [NonAction]
        [CapSubscribe("FanlinCAPDemo.Servces.Test")]
        public void CheckReceivedMessage(Users users, [FromCap] CapHeader header)
            Console.WriteLine($"{DateTime.Now} NoGroup Info:{users},Header:{header.Count}");
        [NonAction]
        [CapSubscribe("FanlinCAPDemo.Servces.Test", Group = "Group1")]
        public void CheckReceivedMessageGroup(Users users)
            Console.WriteLine($"{DateTime.Now} NoGroup Info:{users}");
            //这里不支持事务 使用事务会报错 
            //using (var session = _client.StartTransaction(_capBus, autoCommit: false))
                var collection = _client.GetDatabase("test").GetCollection<BsonDocument>("test.collection");
                collection.InsertOne( new BsonDocument { { "hello", "world" } });
                _capBus.Publish("sample.rabbitmq.mongodb", DateTime.Now);
                //session.CommitTransaction();

5:调用接口,查看kafka和rabbitmq以及相关数据库的变化

a:RabbitMQ:

这里自动新建了两个队列,如果有定义group已group属性为准,如果没有,名称是自定义的

public void ConfigureServices(IServiceCollection services) services.AddControllers();//配置WebAPI服务 services.AddTransient&l.. CAP是一款基于.net标准的库,该库是处理分布式事务的解决方案;它具有轻量级的、开源、易于使用、具有EventBus(事件总线)功能、持久化等特点。官方中文文档:https://cap.dotnetcore.xyz/user-guide/zh/getting-started/quick-start/ 基础核心Nuget包:DotNetCore.CAP EventBus(事件总线)Nuget包:DotNetCore.CAP.RabbitMQ、DotNetCore.CAP.Kafka、DotNetCore Install-package DotNetCore.CAP Install-package DotNetCore.CAP.RabbitMQ Install-package DotNetCore.CAP.SqlServer 使用方法 Startup->ConfigureServices中添加服务: //add CAP services.AddCap(x => 本地消息表模式本地消息表模式,其作为柔性事务的一种,核心是将一个分布式事务拆分为多个本地事务,事务之间通过事件消息衔接,事件消息和上个事务共用一个本地事务存储到本地消息表,再通过定时任务轮询本地消息表进行消息投递,下游业务订阅消息进行消费,本质上是依靠消息的重试机制达到最终一致性。其示意图如下所示,主要分为以下三步:本地业务数据和发布的事件消息共享同一个本地事务,进行数据落库,其中事件消息持久化到... CAP 是一个遵循 .NET Standard 标准库的C#库,用来处理分布式事务以及提供EventBus的功能,她具有轻量级,高性能,易使用等特点。 目前 CAP 使用的是 .NET Standard 1.6 的标准进行开发,目前最新预览版本已经支持 .NET Standard 2.0 ## CAP 的应用场景主要有以下两个: MITCAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具有轻量级,高性能,易使用等特点。你可以轻松的在基于 .NET Core 技术的分布式系统中引入CAP,包括但限于 ASP.NET CoreASP.NET Core on .NET Framework。CAP 以 NuGet 包的形式提供,对项目无任何入侵,你仍然可以以你喜爱的方式来构建分布式系统。