【Kafka】基于Windows的Kafka有关环境搭建、以及使用.NET环境开发的案例代码与演示

【Kafka】基于Windows的Kafka有关环境搭建、以及使用.NET环境开发的案例代码与演示

前言: 基于Windows系统下的Kafka环境搭建;以及使用.NET6环境进行开发简单的生产者与消费者的演示。

一、环境部署

Kafka是使用Java语言和Scala语言开发的,所以需要有对应的Java环境,以及Scala语言环境。

Java环境配置,如果不清楚的,可以查看鄙人的另一篇博客:

cnblogs.com/weskynet/p/

1、Scala环境安装,需要先下载Scala语言包,下载地址:

scala-lang.org/download

要选择Binaries版本的环境,否则需要自己编译:

2、Kafka基于Zookeeper环境运行,zookeeper提供给kafka一系列的功能支持,所以还需要安装Zookeeper有关的环境。下载zookeeper地址:

zookeeper.apache.org/re

3、同样,Zookeeper也需要下载带bin的链接,没有带bin的链接,可能是源码,需要自己编译:

4、接下来是下载主角,Kafka了。下载地址:

kafka.apache.org/downlo

5、同样需要选择下载binary版本,然后根据scala的版本选择对应的版本。

6、下载的三个安装包,如图所示:

7、先安装Scala语言包环境:

8、验证Scala语言包是否安装成功:

控制台窗口,输入:scala-version

如果提示类似如下有关版本信息,则代表安装成功。

9、然后是安装zookeeper环境。必须先启动zookeeper,才可以使用kafka。

安装zookeeper环境,先解压下载的包,然后在解压后的目录下新增data文件夹

10、然后复制data文件夹的绝对路径,备用。在conf文件夹下,编辑cfg文件

11、在cfg文件内,修改dataDir指定为上面新建的data文件夹的绝对路径。注意路径是斜杠/,如果要使用\反斜杆,需要写双反斜杠\\

12、也要更改cfg格式的文件名称为zoo.cfg否则zookeeper无法识别配置文件。Zoo.cfg文件是zookeeper启动时候自动关联的默认配置文件名称。

13、然后新建环境变量ZOOKEEPER_HOME:

14、环境变量path新增:%ZOOKEEPER_HOME%\bin

15、启动zookeeper,直接任意打开控制台,输入zkServer

16、如果都没有报错,一般是启动成功了的。再次验证下,可以任意开个控制台,输入JPS进行查看,如下图所示,有JPS、也有QuorumPeerMain,代表zookeeper启动成功了。

17、Kafka环境安装。先解压,然后在解压后的目录下,新增logs文件夹

18、然后在Config文件夹下,修改server.properties文件,修改log.dirs的值为新增的logs文件夹的绝对路径

19、进入到解压后的kafka目录下,在路径栏输入cmd,快速打开当前文件夹下的控制台窗口:

20、输入命令:

.\bin\windows\kafka-server-start.bat.\config\server.properties

进行启动Kafka服务:

21、启动Kafka报错了,可能是版本问题,kafka一般新版本对windows环境不友好,所以降级一下。此处我把kafka3.0降级为2.8:

22、此处我下载的版本为2.13-2.8.1,各位大佬们可以按照自己意愿选择版本。可能2.x版本和3.x版本跨度比较大,所以3.0版本没法玩。

23、然后是重复以上配置kafka有关的动作,修改有关配置文件以及新增logs文件夹等。此处省略。

24、接着在低版本的kafka目录下,快速进入当前解压缩的目录下,再次输入有关命令尝试一下:

25、没有提示错误,根据提示信息,代表是启动成功了。任意打开控制台,再输入JPS查看下,可以看到Kafka,确认是启动OK了。

26、然后是要一款Kafka可视化工具,此处我选择使用offsetexplorer(原来是叫kafkatools,如下载地址所示),下载地址:

kafkatool.com/download.

27、安装可视化工具,默认可以一直下一步:

28、可以在安装目录下把可执行程序发送到桌面快捷方式,方便打开。

29、一些配置,包括名称、kafka版本、端口号、服务地址等

30、连接以后的效果图,如下。Topic是空的,接下来写点代码。

二、代码开发与测试

31、新建类库项目,当作kafka服务类库

32、此处选择标准库2.1,用于可以给多种.netcore版本使用,方便兼容。

33、引用Confluent.Kafka包。

34、此处新增发布服务类和订阅服务类:

35、新增的生产者发布服务方法代码如下:

代码:

/// <summary> /// Description: Kafka生产者发布服务 /// CreateTime: 2022/1/21 19:35:27 /// Author: Wesky /// </summary> public class PublishService: IPublishService public async Task PublishAsync<TMessage>(string broker, string topicName, TMessage message) where TMessage : class var config = new ProducerConfig BootstrapServers = broker, // kafka服务集群,例如 "192.168.0.1:9092,192.168.0.2:9092" 或者单机 "192.168.0.1:9092" Acks = Acks.All, MessageSendMaxRetries = 3, // 发送失败重试的次数 using (var producer = new ProducerBuilder<string, string>(config).Build()) string data = Newtonsoft.Json.JsonConvert.SerializeObject(message); var sendData = new Message<string, string> { Key = Guid.NewGuid().ToString("N"), Value = data}; var report = await producer.ProduceAsync(topicName, sendData); Console.WriteLine($"消息 >>>>>: {data} \r\n发送到:{report.TopicPartitionOffset}"); catch (ProduceException<string, string> ex) Console.WriteLine($"消息发送失败>>>>>:\r\n Code= {ex.Error.Code} >>> \r\nError= {ex.Message}"); }

36、新增的消费者接收服务方法代码如下:

代码:

/// <summary> /// Description: kafka 消费者订阅服务 /// CreateTime: 2022/1/21 19:36:25 /// Author: Wesky /// </summary> public class SubscribeService: ISubscribeService /// <summary> /// 消费者服务核心代码 /// </summary> /// <typeparam name="TMessage"></typeparam> /// <param name="config">消费者配置信息</param> /// <param name="topics">主题集合</param> /// <param name="func"></param> /// <param name="cancellationToken"></param> /// <returns></returns> public async Task SubscribeAsync<TMessage>(ConsumerConfig config, IEnumerable<string> topics, Action<TMessage> func, CancellationToken cancellationToken) where TMessage : class const int commitPeriod = 1; using (var consumer = new ConsumerBuilder<Ignore, string>(config) .SetErrorHandler((_, e) => Console.WriteLine($"消费错误 >>>>>: {e.Reason}"); .SetStatisticsHandler((_, json) => Console.WriteLine($"************************************************"); .SetPartitionsAssignedHandler((c, partitionList) => string partitions = string.Join(", ", partitionList); Console.WriteLine($"分配的分区 >>>>> : {partitions}"); .SetPartitionsRevokedHandler((c, partitionList) => string partitions = string.Join(", ", partitionList); Console.WriteLine($"回收的分区 >>>>> : {partitions}"); .Build()) consumer.Subscribe(topics); while (true) var consumeResult = consumer.Consume(cancellationToken); if (consumeResult.IsPartitionEOF) continue; if(consumeResult?.Offset % commitPeriod == 0){ var result = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message?.Value); func(result); // 消费消息 catch (Exception ex) Console.WriteLine($"消费业务处理失败: {ex.Message}"); consumer.Commit(consumeResult); // 手动提交 Console.WriteLine($"消费者消费完成,已提交 "); catch (KafkaException e) Console.WriteLine($"提交错误 >>>>> : {e.Error.Reason}"); catch (ConsumeException e) Console.WriteLine($"消费错误>>>>> : {e.Error.Reason}"); catch (Exception e) Console.WriteLine($"其他错误 >>>>> :{e.Message}"); consumer.Close(); await Task.CompletedTask; }

37、并且提供对应的接口服务,用于开放给外部调用,或者提供依赖注入使用:

38、新建一个控制台项目,用来当作消费者端的测试,并且新增一个方法,用来当作消费者接收到消息以后的业务处理方法体。此处控制台环境版本为.NET 6

39、消费客户端代码如下。其中,BootstrapServers也可以提供集群地址,例如ip1:port,ip2:port……服务之间以半角逗号隔开。

40、再新增一个webapi项目,用来当作生产者的客户端进行发送数据。以及对kafka服务类部分进行依赖注入注册,此处使用单例。该webapi此处使用.NET 6环境,带有控制器的模式。

41、新增的控制器里面,进行生产者的注入与实现。注意:topicName参数对应上边的topic-wesky,通过主题绑定,否则消费者不认识就没办法消费到了。

控制器代码:

[Route("api/[controller]/[action]")] [ApiController] public class ProducerController : ControllerBase IPublishService _service = null; public ProducerController(IPublishService publishService) _service = publishService; [HttpPost]