之前的文章
消息队列--Kafka入门
简单介绍了一下kafka的相关概念,这次来实际操作一下kafka
操作Kafka之前需要先部署一下kafka集群,为了避免本机安装出现各种奇奇怪怪的错误,这里使用docker部署kafka。
开始之前首先需要对docker compose有些了解。可以参考这篇文章进行了解
Docker composer介绍
接着新建一个目录,创建docker-compose.yml文件
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
volumes:
- ./zookeeper_data:/opt/zookeeper-3.4.6/data
container_name: zookeeper
ports:
- "2181:2181"
- "2182:2182"
restart: always
kafka:
image: wurstmeister/kafka
container_name: kafka_01
depends_on:
- zookeeper
ports:
- "9092:9092"
volumes:
- ./kafka_log:/kafka
environment:
- KAFKA_ADVERTISED_HOST_NAME=localhost
- KAFKA_BROKER_ID=0
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_HEAP_OPTS=-Xmx512M -Xms16M
restart: always
kafka3:
image: wurstmeister/kafka
container_name: kafka_03
depends_on:
- zookeeper
ports:
- "9094:9094"
volumes:
- ./kafka_log:/kafka3
environment:
- KAFKA_ADVERTISED_HOST_NAME=localhost
- KAFKA_BROKER_ID=2
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:9094
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_HEAP_OPTS=-Xmx512M -Xms16M
restart: always
kafka2:
image: wurstmeister/kafka
container_name: kafka_02
depends_on:
- zookeeper
ports:
- "9093:9093"
volumes:
- ./kafka_log:/kafka2
environment:
- KAFKA_ADVERTISED_HOST_NAME=localhost
- KAFKA_BROKER_ID=1
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_HEAP_OPTS=-Xmx512M -Xms16M
restart: always
kafka_manager:
image: sheepkiller/kafka-manager
ports:
- "9000:9000"
environment:
- ZK_HOSTS=zookeeper:2181
depends_on:
- zookeeper
- kafka
restart: always
docker-compose up -d
这个命令会根据docker-compose.yml文件的配置内容拉取镜像,生成容器,这几个容器共同组成我们这个kafka集群项目.
创建totpic
进入kafka容器当中(使用Docker Desktop可以直接进入,终端执行)创建需要知道zookeeper的地址
docker exec -it kafka容器id /bin/bash
cd /opt/kafka_2.13-2.8.1/
bin/kafka-topics.sh --create --zookeeper 172.19.0.2:2181 --replication-factor 1 -partitions 1 --topic test
可以直接访问localhost:9000使用kafka manager创建kafka集群和创建topic,更方便
ps.docker composer 执行的时候会创建一个默认的网络 文件夹_default,几个serveice 同处于一个网络当中,可以通过service name 互相访问.
比较需要关注的是以下两个参数
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9093
listener表示tcp监听本地9093端口,用于接收信息
advertised_listener应该指的是kafka broker监听的外网的地址(或者说暴露给zookeeper的地址?),需要填写node ip,这里因为在一个项目里,使用服务名(hostname)来标示? 有对docker 网络设置比较了解的朋友可以帮忙讲解一下
像上面yml文件这么配置后,还需要在宿主机的/etc/host文件配置,否则写代码pub/sub会保存找不到host
127.0.0.1 kafka
127.0.0.1 kafka2
127.0.0.1 kafka3
目前集群当中只有三个broker,即三个kafka服务器/容器。
我们创建两个topic,一个topic只有一个partition,另一个topic有两个partition分别位于两个broker上
然后我们就可以编写代码来使用消息队列了
Golang kafka消息队列使用
package producer
import (
"fmt"
"time"
"github.com/Shopify/sarama"
func Pub(msg sarama.ProducerMessage, array []string) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
client, err := sarama.NewSyncProducer(array, config)
if err != nil {
fmt.Println("producer closed, err:", err)
return
defer client.Close()
var closeChan <-chan struct{}
t := time.NewTicker(1 * time.Second)
Exit:
for {
select {
case <-closeChan:
t.Stop()
break Exit
case <-t.C:
pid, offset, err := client.SendMessage(&msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
fmt.Printf("topic:%v pid:%v offset:%v\n", msg.Topic, pid, offset)
这里创建了一个同步生产者,每秒钟会向kafka集群生产一条消息,可以通过closeChan关闭发送,但代码没有用到
package comsumer
import (
"fmt"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"log"
"os/signal"
"sync"
func Sub(wg *sync.WaitGroup, brokers, topics []string, groupId string) {
defer wg.Done()
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Consumer.Offsets.Initial = sarama.OffsetNewest
consumer, err := cluster.NewConsumer(brokers, groupId, topics, config)
if err != nil {
log.Printf("%s: sarama.NewSyncProducer err, message=%s \n", groupId, err)
return
defer consumer.Close()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
go func() {
for err := range consumer.Errors() {
log.Printf("%s:Error: %s\n", groupId, err.Error())
go func() {
for ntf := range consumer.Notifications() {
log.Printf("%s:Rebalanced: %+v \n", groupId, ntf)
var successes int
Loop:
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\t%s\t%s\n", groupId, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "")
successes++
case <-signals:
break Loop
fmt.Fprintf(os.Stdout, "%s consume %d messages \n", groupId, successes)
这里创建了一个集群消费者,指定了消费的topic和自己的消费者组ID
package main
import (
"github.com/Shopify/sarama"
"kafka_practise/comsumer"
"kafka_practise/producer"
"sync"
var array = []string{"127.0.0.1:9092", "127.0.0.1:9093", "127.0.0.1:9094"}
func main() {
topic1 := []string{"test"}
topic2 := []string{"test_2"}
var wg = &sync.WaitGroup{}
wg.Add(4)
msg := sarama.ProducerMessage{}
msg.Topic = "test_2"
msg.Value = sarama.StringEncoder("this is a test log2")
msg2 := sarama.ProducerMessage{}
msg2.Topic = "test"
msg2.Value = sarama.StringEncoder("this is a test log")
go producer.Pub(msg, array)
go producer.Pub(msg2, array)
go comsumer.Sub(wg, array, topic1, "CG1")
go comsumer.Sub(wg, array, topic2, "CG2")
wg.Wait()
main函数启动两个生产者王两个topic中发消息,启动两个消费者去消费两个topic的消息,消费的结果如下图
现在一个简单的生产和消费的例子就完成了。test_2 topic有2个partition,CG2都会去消费,partition之间的消息不是有序的。
后面可以再试试rebalance的情况以及再深入了解一下消息如何分发和处理的流程。 docker的配置也是一个头疼的地方,一开始因为这个网络的问题卡了半天,要么是broker之间访问不到,要么是宿主机访问不到,要么是集群起不了,有了解的大佬可以推荐一下相关的文章学习一下。