上一章已经讲解了Spring Boot集成Kafka基本使用可以参考
juejin.cn/post/707847…
但是实际的项目中需要考虑相关安全性问题,本文将详细的讲解Kafka的SASL认证。
SASL基础配置
1.打开server.properties,添加如下信息
#修改认证为SASL
listeners=SASLL_PLAINTEXT://localhost:9092
advertised.listeners=SASL_PLAINTEXT://17.18.188.121:9092
#add SALS
security.inter.broker.protocol=SASL_PLAINTEXT
# 表示开启PLAIN认证机制
sasl.enabled.mechanisms=PLAIN
# 表示Broker间通信也启用PLAIN机制
sasl.mechanism.inter.broker.protocol=PLAIN
# 授权方面
# 设置身份验证使用的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 设置超级账号,如果是多个需要分号分割,例如:User:admin;User:root
super.users=User:admin
# 对所有用户topic可见
allow.everyone.if.no.acl.found=true
2.在/usr/local/kafka/config目录下,新增kafka_server_jaas.conf认证文件内容如下
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
说明:1.user_admin="admin",指的是客户端账号用户名为admin,密码为admin。
2.需要注意后面两个分号。
3.kafka-server-start.sh文件的添加认证信息,内容如下:
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
4.打开zookeeper.properties文件,末尾添加如下信息
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
说明:zookeeper需要开启SASl认证
5.在/usr/local/kafka/config目录下新增zookeeper认证zookeeper_jaas.conf文件,内容如下:
ZKServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
6.zookeeper-server-start.sh添加认证信息,内容如下
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/kafka/config/zookeeper_jaas.conf"
生产者+消费者配置
1.在/usr/local/kafka/config目录下,新增kafka_client_jaas.conf认证文件内容如下
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
2.打开produce.properties文件,末尾添加如下信息
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
3.kafka-console-producer.sh文件的添加认证信息,内容如下
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/config/kafka_client_jaas.conf"
4.打开consumer.properties文件末尾添加如下信息
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
5.kafka-console-consumer.sh文件的添加认证内容如下:
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=usr/local/config/kafka_client_jaas.conf"
创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
./kafka-console-producer.sh --broker-list localhost:9092 --topic test1 --producer.config ../config/producer.properties
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --consumer.config ../config/consumer.properties
生产者发送消息,消费者能收到消息则代表配置成功。
Spring Boot 集成
1.新建客户端认证文件kafka_client_jaas.conf,内容如下
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
注意:客户端配置的用户名和密码需要与服务端(kafka_server_jaas.conf)文件指定的用户名和密码一致。
2.application.yml文件中添加kafka配置
kafka:
bootstrap-servers: localhost:9092
listener:
missing-topics-fatal: false
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
sasl:
mechanism: PLAIN
security:
protocol: SASL_PLAINTEXT
consumer:
group-id: test-consumer-group-01
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
sasl:
mechanism: PLAIN
security:
protocol: SASL_PLAINTEXT
client-id: test-01
3.编写生产者
@RestController
@RequestMapping("/kafka")
public class KafkaProducer
private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public void send(){
Message message = new Message();
message.setId(UUID.randomUUID().toString());
message.setSendTime(new Date());
message.setContent("这是一个测试消息");
logger.info("--------------- message = {}", JSON.toJSONString(message));
kafkaTemplate.send("test", JSON.toJSONString(message));
4.编写消费者
@KafkaListener(topics = {"test"})
public void listen(ConsumerRecord<?, ?> record)
Optional<?> kafkaMessage = Optional.ofNullable(record.value())
if (kafkaMessage.isPresent())
Object message = kafkaMessage.get()
logger.info("====>接收到kafka消息: topic = {}, offset = {} \n", record.topic(), record.offset());
logger.info(" kafka message:{}" + message);
5.项目启动配置中加载客户端认证,eclipse配置如下:
-Djava.security.auth.login.config=E:\test\kafka_client_jaas.conf
可能遇到的错误
Kafka的SASL认证配置的过程比较复杂,其中也遇到了一些问题,我在这里列举了一些错误和相关的解决办法。
1.启动时报 Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
解决方案:项目启动时,需要添加认证信息,详细参考客户端配置第5条
2.failed authentication due to: Authentication failed: Invalid username or password
解决方案:检查客户端配置的用户名和密码是否与服务器端认证的用户名和密码一致。
3.生产者进程启动报错
解决方案:启动生产者测试需要添加配置文件
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --consumer.config ../config/consumer.properties
4.Exception while loading Zookeeper JAAS login context 'Client'
解决方案:zookeeper的启动需要添加SASL认证
Kafka的SASL认证的配置相对比较复杂,如果大家在配置的过程中有相关问题,可以及时反馈。