之前的文章 消息队列--Kafka入门 简单介绍了一下kafka的相关概念,这次来实际操作一下kafka

操作Kafka之前需要先部署一下kafka集群,为了避免本机安装出现各种奇奇怪怪的错误,这里使用docker部署kafka。

开始之前首先需要对docker compose有些了解。可以参考这篇文章进行了解 Docker composer介绍

接着新建一个目录,创建docker-compose.yml文件

version: '3'
services:
    zookeeper: 
        image: wurstmeister/zookeeper:3.4.6 
        volumes: 
            - ./zookeeper_data:/opt/zookeeper-3.4.6/data 
        container_name: zookeeper 
        ports: 
            - "2181:2181" 
            - "2182:2182" 
        restart: always
    kafka: 
        image: wurstmeister/kafka 
        container_name: kafka_01 
        depends_on: 
            - zookeeper 
        ports: 
            - "9092:9092" 
        volumes: 
            - ./kafka_log:/kafka 
        environment: 
            - KAFKA_ADVERTISED_HOST_NAME=localhost
            - KAFKA_BROKER_ID=0 
            - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092                     # kafka tcp listen ip
            - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092        # kafka broker listen ip
            - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT 
            - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 
            - KAFKA_HEAP_OPTS=-Xmx512M -Xms16M
        restart: always
    kafka3: 
        image: wurstmeister/kafka 
        container_name: kafka_03 
        depends_on: 
            - zookeeper 
        ports: 
            - "9094:9094" 
        volumes: 
            - ./kafka_log:/kafka3 
        environment: 
            - KAFKA_ADVERTISED_HOST_NAME=localhost
            - KAFKA_BROKER_ID=2 
            - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094                     # kafka tcp listen ip
            - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:9094        # kafka broker listen ip
            - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT 
            - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 
            - KAFKA_HEAP_OPTS=-Xmx512M -Xms16M
        restart: always
    kafka2: 
        image: wurstmeister/kafka 
        container_name: kafka_02 
        depends_on: 
            - zookeeper 
        ports: 
            - "9093:9093" 
        volumes: 
            - ./kafka_log:/kafka2 
        environment: 
            - KAFKA_ADVERTISED_HOST_NAME=localhost
            - KAFKA_BROKER_ID=1 
            - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093                     # kafka tcp listen ip
            - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9093        # kafka broker listen ip
            - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT 
            - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 
            - KAFKA_HEAP_OPTS=-Xmx512M -Xms16M
        restart: always
# kafka cluster manager dashboard
    kafka_manager: 
        image: sheepkiller/kafka-manager 
        ports: 
            - "9000:9000" 
        environment: 
            - ZK_HOSTS=zookeeper:2181 
        depends_on: 
            - zookeeper 
            - kafka 
        restart: always
docker-compose up -d

这个命令会根据docker-compose.yml文件的配置内容拉取镜像,生成容器,这几个容器共同组成我们这个kafka集群项目.

创建totpic

进入kafka容器当中(使用Docker Desktop可以直接进入,终端执行)创建需要知道zookeeper的地址

docker exec -it kafka容器id /bin/bash
# 进入kafka目录
cd /opt/kafka_2.13-2.8.1/
bin/kafka-topics.sh --create --zookeeper 172.19.0.2:2181 --replication-factor 1 -partitions 1 --topic test

可以直接访问localhost:9000使用kafka manager创建kafka集群和创建topic,更方便

ps.docker composer 执行的时候会创建一个默认的网络 文件夹_default,几个serveice 同处于一个网络当中,可以通过service name 互相访问.

比较需要关注的是以下两个参数

  • KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093
  • KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9093
  • listener表示tcp监听本地9093端口,用于接收信息

    advertised_listener应该指的是kafka broker监听的外网的地址(或者说暴露给zookeeper的地址?),需要填写node ip,这里因为在一个项目里,使用服务名(hostname)来标示? 有对docker 网络设置比较了解的朋友可以帮忙讲解一下

    像上面yml文件这么配置后,还需要在宿主机的/etc/host文件配置,否则写代码pub/sub会保存找不到host

    127.0.0.1   kafka
    127.0.0.1   kafka2
    127.0.0.1   kafka3
    

    目前集群当中只有三个broker,即三个kafka服务器/容器。 我们创建两个topic,一个topic只有一个partition,另一个topic有两个partition分别位于两个broker上

    然后我们就可以编写代码来使用消息队列了

    Golang kafka消息队列使用

    package producer
    import (
       "fmt"
       "time"
       "github.com/Shopify/sarama"
    // 基于sarama第三方库开发的kafka client
    func Pub(msg sarama.ProducerMessage, array []string) {
       config := sarama.NewConfig()                              //使用默认配置
       config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
       config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
       config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
       // 连接kafka
       client, err := sarama.NewSyncProducer(array, config)
       if err != nil {
          fmt.Println("producer closed, err:", err)
          return
       defer client.Close()
       var closeChan <-chan struct{}
       t := time.NewTicker(1 * time.Second)
       // 每秒发送一条消息
    Exit:
       for {
          select {
          case <-closeChan:
             t.Stop()
             break Exit
          case <-t.C:
             // 发送消息
             pid, offset, err := client.SendMessage(&msg)
             if err != nil {
                fmt.Println("send msg failed, err:", err)
                return
             fmt.Printf("topic:%v pid:%v offset:%v\n", msg.Topic, pid, offset)
    

    这里创建了一个同步生产者,每秒钟会向kafka集群生产一条消息,可以通过closeChan关闭发送,但代码没有用到

    package comsumer
    import (
       "fmt"
       "github.com/Shopify/sarama"
       "github.com/bsm/sarama-cluster"
       "log"
       "os/signal"
       "sync"
    // 支持brokers cluster的消费者
    func Sub(wg *sync.WaitGroup, brokers, topics []string, groupId string) {
       defer wg.Done()
       config := cluster.NewConfig()
       config.Consumer.Return.Errors = true
       config.Group.Return.Notifications = true
       config.Consumer.Offsets.Initial = sarama.OffsetNewest //使用最新offset
       // init consumer
       consumer, err := cluster.NewConsumer(brokers, groupId, topics, config)
       if err != nil {
          log.Printf("%s: sarama.NewSyncProducer err, message=%s \n", groupId, err)
          return
       defer consumer.Close()
       // trap SIGINT to trigger a shutdown
       signals := make(chan os.Signal, 1)
       signal.Notify(signals, os.Interrupt)
       // consume errors
       go func() {
          for err := range consumer.Errors() {
             log.Printf("%s:Error: %s\n", groupId, err.Error())
       // consume notifications
       go func() {
          for ntf := range consumer.Notifications() {
             log.Printf("%s:Rebalanced: %+v \n", groupId, ntf)
       // consume messages, watch signals
       var successes int
    Loop:
       for {
          select {
          case msg, ok := <-consumer.Messages():
             if ok {
                fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\t%s\t%s\n", groupId, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
                consumer.MarkOffset(msg, "") // mark message as processed
                successes++
          case <-signals:
             break Loop
       fmt.Fprintf(os.Stdout, "%s consume %d messages \n", groupId, successes)
    

    这里创建了一个集群消费者,指定了消费的topic和自己的消费者组ID

    package main
    import (
       "github.com/Shopify/sarama"
       "kafka_practise/comsumer"
       "kafka_practise/producer"
       "sync"
    var array = []string{"127.0.0.1:9092", "127.0.0.1:9093", "127.0.0.1:9094"}
    func main() {
       topic1 := []string{"test"}
       topic2 := []string{"test_2"}
       var wg = &sync.WaitGroup{}
       wg.Add(4)
       // 构造一个消息
       msg := sarama.ProducerMessage{}
       msg.Topic = "test_2"
       msg.Value = sarama.StringEncoder("this is a test log2")
       // 构造一个消息
       msg2 := sarama.ProducerMessage{}
       msg2.Topic = "test"
       msg2.Value = sarama.StringEncoder("this is a test log")
       go producer.Pub(msg, array)
       go producer.Pub(msg2, array)
       go comsumer.Sub(wg, array, topic1, "CG1")
       go comsumer.Sub(wg, array, topic2, "CG2")
       wg.Wait()
    

    main函数启动两个生产者王两个topic中发消息,启动两个消费者去消费两个topic的消息,消费的结果如下图

    现在一个简单的生产和消费的例子就完成了。test_2 topic有2个partition,CG2都会去消费,partition之间的消息不是有序的。

    后面可以再试试rebalance的情况以及再深入了解一下消息如何分发和处理的流程。 docker的配置也是一个头疼的地方,一开始因为这个网络的问题卡了半天,要么是broker之间访问不到,要么是宿主机访问不到,要么是集群起不了,有了解的大佬可以推荐一下相关的文章学习一下。

    分类:
    后端
    标签: