上一章已经讲解了Spring Boot集成Kafka基本使用可以参考 juejin.cn/post/707847… 但是实际的项目中需要考虑相关安全性问题,本文将详细的讲解Kafka的SASL认证。

SASL基础配置

1.打开server.properties,添加如下信息

#修改认证为SASL
listeners=SASLL_PLAINTEXT://localhost:9092
advertised.listeners=SASL_PLAINTEXT://17.18.188.121:9092  
#add SALS
security.inter.broker.protocol=SASL_PLAINTEXT
# 表示开启PLAIN认证机制
sasl.enabled.mechanisms=PLAIN
# 表示Broker间通信也启用PLAIN机制
sasl.mechanism.inter.broker.protocol=PLAIN
# 授权方面
# 设置身份验证使用的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 设置超级账号,如果是多个需要分号分割,例如:User:admin;User:root
super.users=User:admin
# 对所有用户topic可见
allow.everyone.if.no.acl.found=true

2.在/usr/local/kafka/config目录下,新增kafka_server_jaas.conf认证文件内容如下

 KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"; //用户名为admin,密码为admin

说明:1.user_admin="admin",指的是客户端账号用户名为admin,密码为admin。 2.需要注意后面两个分号。

3.kafka-server-start.sh文件的添加认证信息,内容如下:

if [ "x$KAFKA_OPTS" ]; then
    export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"

4.打开zookeeper.properties文件,末尾添加如下信息

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider  #开启认证功能
requireClientAuthScheme=sasl   #认证方式为sasl
jaasLoginRenew=3600000

说明:zookeeper需要开启SASl认证

5.在/usr/local/kafka/config目录下新增zookeeper认证zookeeper_jaas.conf文件,内容如下:

 ZKServer {
org.apache.kafka.common.security.plain.PlainLoginModule required 
    username="admin" 
    password="admin" 
    user_admin="admin" 

6.zookeeper-server-start.sh添加认证信息,内容如下

if [ "x$KAFKA_OPTS" ]; then
    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/kafka/config/zookeeper_jaas.conf"

生产者+消费者配置

1.在/usr/local/kafka/config目录下,新增kafka_client_jaas.conf认证文件内容如下

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";

2.打开produce.properties文件,末尾添加如下信息

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

3.kafka-console-producer.sh文件的添加认证信息,内容如下

if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/config/kafka_client_jaas.conf"

4.打开consumer.properties文件末尾添加如下信息

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

5.kafka-console-consumer.sh文件的添加认证内容如下:

if [ "x$KAFKA_OPTS" ]; then
  export KAFKA_OPTS="-Djava.security.auth.login.config=usr/local/config/kafka_client_jaas.conf"
创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
./kafka-console-producer.sh --broker-list localhost:9092 --topic test1  --producer.config  ../config/producer.properties
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1  --consumer.config   ../config/consumer.properties

生产者发送消息,消费者能收到消息则代表配置成功。

Spring Boot 集成

1.新建客户端认证文件kafka_client_jaas.conf,内容如下

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";

注意:客户端配置的用户名和密码需要与服务端(kafka_server_jaas.conf)文件指定的用户名和密码一致。

2.application.yml文件中添加kafka配置

kafka:
    bootstrap-servers: localhost:9092
    listener:
      missing-topics-fatal: false
   # 配置生产者
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        sasl:
          mechanism: PLAIN
        security:
          protocol: SASL_PLAINTEXT
    # 配置消费者
    consumer:
      group-id: test-consumer-group-01
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        sasl:
          mechanism: PLAIN
        security:
          protocol: SASL_PLAINTEXT
    #timeout: 500
    client-id: test-01

3.编写生产者

@RestController
@RequestMapping("/kafka")
public class KafkaProducer
    private static final Logger logger =    LoggerFactory.getLogger(KafkaProducer.class);
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @RequestMapping("/send")
     public void send(){
         Message message = new Message();
         message.setId(UUID.randomUUID().toString());
         message.setSendTime(new Date());
         message.setContent("这是一个测试消息");
         logger.info("--------------- message = {}", JSON.toJSONString(message));
         //topic为test
         kafkaTemplate.send("test", JSON.toJSONString(message));

4.编写消费者

 #topics为创建的topic的名称
 @KafkaListener(topics = {"test"})
    public void listen(ConsumerRecord<?, ?> record) 
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) 
            Object message = kafkaMessage.get();
            logger.info("====>接收到kafka消息: topic = {}, offset = {} \n", record.topic(), record.offset());
            logger.info(" kafka message:{}" + message);

5.项目启动配置中加载客户端认证,eclipse配置如下:

-Djava.security.auth.login.config=E:\test\kafka_client_jaas.conf

可能遇到的错误

Kafka的SASL认证配置的过程比较复杂,其中也遇到了一些问题,我在这里列举了一些错误和相关的解决办法。

1.启动时报 Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set

解决方案:项目启动时,需要添加认证信息,详细参考客户端配置第5条

2.failed authentication due to: Authentication failed: Invalid username or password

解决方案:检查客户端配置的用户名和密码是否与服务器端认证的用户名和密码一致。

3.生产者进程启动报错

解决方案:启动生产者测试需要添加配置文件

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1  --consumer.config   ../config/consumer.properties

4.Exception while loading Zookeeper JAAS login context 'Client'

解决方案:zookeeper的启动需要添加SASL认证

Kafka的SASL认证的配置相对比较复杂,如果大家在配置的过程中有相关问题,可以及时反馈。

分类:
后端
标签: